This is an automated email from the ASF dual-hosted git repository. gaoxingcun pushed a commit to branch reconfiguration_scheduling in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit d43fbad9086cea2e8c2e5e6859e4e900d5c9beca Author: TJxiaobao <[email protected]> AuthorDate: Wed Sep 3 12:58:37 2025 +0800 modified:modify some architecture directories --- docs/ARCHITECTURE_COMPARISON.md | 255 ++++++++++ docs/SCHEDULING_ARCHITECTURE.md | 533 +++++++++++++++++++++ examples/main_simulation.go | 289 +++++++++++ pkg/banner/embed.go | 24 +- pkg/collector/basic/database/jdbc_auto_register.go | 36 ++ pkg/collector/bootstrap.go | 119 +++-- pkg/collector/collect_service.go | 248 +++++++++- .../common/dispatcher/entrance/collect_server.go | 51 +- pkg/collector/common/timer/timer_dispatcher.go | 203 +++++++- pkg/collector/config/config.go | 118 ----- pkg/collector/config/config_test.go | 45 -- pkg/collector/registry.go | 39 +- pkg/collector/registry/registry_center.go | 251 ++++++++++ pkg/collector/server/server.go | 69 --- pkg/collector/server/server_test.go | 1 - pkg/types/job/job_types.go | 17 + 16 files changed, 1995 insertions(+), 303 deletions(-) diff --git a/docs/ARCHITECTURE_COMPARISON.md b/docs/ARCHITECTURE_COMPARISON.md new file mode 100644 index 0000000..972d942 --- /dev/null +++ b/docs/ARCHITECTURE_COMPARISON.md @@ -0,0 +1,255 @@ +# HertzBeat Go vs Java 架构对比速览 + +## 🔄 调度流程对比 + +### Java版本调度流程 +```mermaid +graph TD + A[Manager调度器] --> B[一致性哈希分配] + B --> C[网络通信ClusterMsg] + C --> D[Collector接收] + D --> E[TimerDispatcher] + E --> F[WheelTimerTask] + F --> G[CommonDispatcher] + G --> H[MetricsCollectorQueue] + H --> I[WorkerPool线程池] + I --> J[MetricsCollect] + J --> K[CollectStrategyFactory] + K --> L[SPI加载采集器] + L --> M[具体采集器JDBC/HTTP/SSH] + M --> N[返回结果] +``` + +### Go版本调度流程 +```mermaid +graph TD + A1[Manager调度器] --> B1[网络通信待完善] + B1 --> C1[CollectServer接收] + C1 --> D1[TimerDispatcher] + D1 --> E1[WheelTimerTask] + E1 --> F1[DispatchMetricsTask] + F1 --> G1[WorkerPool协程池] + G1 --> H1[MetricsCollect] + H1 --> I1[CollectService] + I1 --> J1[CollectorRegistry] + J1 --> K1[自动注册采集器] + K1 --> L1[具体采集器JDBC等] + L1 --> M1[返回结果] +``` + +## 📊 核心差异对比表 + +| 维度 | Java版本 | Go版本 | 备注 | +|------|----------|---------|------| +| **网络通信** | ✅ 完整的ClusterMsg协议 | ⚠️ 待完善(目前模拟) | Go版本待开发 | +| **任务调度** | TimerDispatcher + CommonDispatcher | TimerDispatcher直接分发 | Go版本更简洁 | +| **工作池** | ThreadPoolExecutor | Goroutine Pool | Go版本更轻量 | +| **任务队列** | 优先级队列MetricsCollectorQueue | 直接Channel队列 | Java版本更复杂 | +| **采集器注册** | Java SPI自动发现 | init()函数+注册中心 | 各有优势 | +| **并发模型** | 线程池(~2MB/线程) | 协程池(~2KB/协程) | Go版本更高效 | +| **内存占用** | 较大(JVM开销) | 较小(原生编译) | Go版本优势明显 | +| **启动速度** | 较慢(JVM+类加载) | 很快(静态编译) | Go版本优势明显 | +| **超时处理** | 独立ScheduledExecutor | 集成在TimerDispatcher | Go版本更统一 | +| **重试机制** | 基于优先级队列 | 指数退避策略 | 实现方式不同 | + +## 🎯 关键技术对比 + +### 1. 并发处理 + +#### Java版本 +```java +// 线程池配置 +ThreadPoolExecutor workerExecutor = new ThreadPoolExecutor( + coreSize, // 核心线程数 + maxSize, // 最大线程数 + 10, TimeUnit.SECONDS, // 空闲超时 + new SynchronousQueue<>(), // 同步队列 + threadFactory, // 线程工厂 + new ThreadPoolExecutor.AbortPolicy() // 拒绝策略 +); +``` + +#### Go版本 +```go +// 协程池配置 +type WorkerPool struct { + config WorkerPoolConfig + taskQueue chan Task // 任务通道 + workers sync.Map // 工作协程映射 + ctx context.Context // 上下文控制 +} + +// 动态协程管理 +func (wp *WorkerPool) adjustWorkerCount() { + if queuedTasks > currentWorkers*2 { + wp.addWorker(false) // 添加非核心worker + } +} +``` + +### 2. 采集器注册 + +#### Java版本 (SPI机制) +```java +// META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect +org.apache.hertzbeat.collector.collect.database.JdbcCommonCollect +org.apache.hertzbeat.collector.collect.http.HttpCollectImpl + +// 自动加载 +ServiceLoader<AbstractCollect> loader = ServiceLoader.load(AbstractCollect.class); +for (AbstractCollect collect : loader) { + COLLECT_STRATEGY.put(collect.supportProtocol(), collect); +} +``` + +#### Go版本 (注册中心机制) +```go +// 自动注册 +func init() { + registry.RegisterCollectorFactory( + "jdbc", + func(logger logger.Logger) basic.AbstractCollector { + return NewJDBCCollector(logger) + }, + registry.WithPriority(10), + ) +} + +// 统一注册 +func RegisterBuiltinCollectors(service *CollectService, logger logger.Logger) { + collectors, _ := registry.GetGlobalCenter().CreateCollectors(logger) + for protocol, collector := range collectors { + service.RegisterCollector(protocol, collector) + } +} +``` + +### 3. 任务分发策略 + +#### Java版本 (多层分发) +```java +// 1. TimerDispatcher调度 +wheelTimer.newTimeout(timerJob, interval, TimeUnit.SECONDS); + +// 2. CommonDispatcher分发 +MetricsCollect metricsCollect = jobRequestQueue.getJob(); +workerPool.executeJob(metricsCollect); + +// 3. WorkerPool执行 +workerExecutor.execute(runnable); +``` + +#### Go版本 (直接分发) +```go +// 1. TimerDispatcher调度 +timeout := td.wheelTimer.NewTimeout(timerTask, delay) + +// 2. 直接分发到WorkerPool +for _, metric := range job.Metrics { + metricsCollect := worker.NewMetricsCollect(...) + td.workerPool.Submit(metricsCollect) +} +``` + +## 🚀 性能特性对比 + +### 启动性能 +| 指标 | Java版本 | Go版本 | 对比 | +|------|----------|--------|------| +| 启动时间 | ~3-5秒 | ~0.1-0.5秒 | Go快10倍 | +| 内存占用 | ~128MB+ | ~20-50MB | Go省60%+ | +| 文件大小 | ~50MB+ | ~10-20MB | Go小50%+ | + +### 运行时性能 +| 指标 | Java版本 | Go版本 | 对比 | +|------|----------|--------|------| +| 协程/线程开销 | 2MB/线程 | 2KB/协程 | Go省99.9% | +| 上下文切换 | 重量级 | 轻量级 | Go更快 | +| GC延迟 | 可能较长 | 通常<1ms | Go更稳定 | + +## 🔧 开发体验对比 + +### 添加新采集器复杂度 + +#### Java版本 +1. ✅ 实现AbstractCollect接口 +2. ✅ 在META-INF/services中注册 +3. ✅ 自动发现,无需修改代码 + +**总结**: 配置简单,但依赖SPI机制 + +#### Go版本 +1. ✅ 实现AbstractCollector接口 +2. ✅ 添加init()注册函数 +3. ✅ 在registry.go中添加import + +**总结**: 需要手动导入,但更灵活可控 + +### 调试和监控 + +#### Java版本 +```java +// JVM工具链丰富 +- JProfiler、VisualVM等性能分析 +- JMX监控指标 +- 成熟的APM工具支持 +``` + +#### Go版本 +```go +// Go原生工具 +- go tool pprof性能分析 +- runtime.ReadMemStats()监控 +- Prometheus metrics集成 +``` + +## 🎯 选择建议 + +### 适合Java版本的场景 +- ✅ 企业级环境,成熟度要求高 +- ✅ 需要丰富的第三方库支持 +- ✅ 团队Java技能更强 +- ✅ CPU密集型采集任务 + +### 适合Go版本的场景 +- ✅ 云原生环境,资源敏感 +- ✅ 高并发IO密集型采集 +- ✅ 快速启动和部署需求 +- ✅ 容器化微服务架构 + +## 📈 发展趋势 + +### Java版本优势保持 +- 生态系统成熟 +- 企业级特性完善 +- 社区支持强大 + +### Go版本发展方向 +- 网络通信层完善 +- 更多协议采集器 +- 云原生特性增强 +- 性能优势扩大 + +--- + +## 📋 快速参考 + +### 核心类对应关系 +| Java类 | Go对应 | 功能 | +|--------|--------|------| +| `TimerDispatcher` | `TimerDispatcher` | 时间轮调度 | +| `CommonDispatcher` | `DispatchMetricsTask` | 任务分发 | +| `WorkerPool` | `WorkerPool` | 工作池 | +| `MetricsCollect` | `MetricsCollect` | 采集任务 | +| `CollectStrategyFactory` | `CollectService` | 采集器管理 | +| `AbstractCollect` | `AbstractCollector` | 采集器接口 | + +### 关键配置参数 +| 配置项 | Java默认值 | Go默认值 | 说明 | +|--------|------------|----------|------| +| 核心工作线程 | CPU核数 | CPU核数 | 基础并发数 | +| 最大工作线程 | CPU核数×16 | CPU核数×4 | 最大并发数 | +| 任务队列大小 | SynchronousQueue | 1000 | 队列容量 | +| 时间轮大小 | 512 | 512 | 调度精度 | + +这个对比文档帮助开发者快速理解两个版本的差异,选择合适的版本进行开发和部署。 diff --git a/docs/SCHEDULING_ARCHITECTURE.md b/docs/SCHEDULING_ARCHITECTURE.md new file mode 100644 index 0000000..193c1f0 --- /dev/null +++ b/docs/SCHEDULING_ARCHITECTURE.md @@ -0,0 +1,533 @@ +# HertzBeat Go Collector 调度架构设计文档 + +## 📋 概述 + +本文档详细说明了HertzBeat Go版本Collector的调度架构设计,包括与Java版本的对比分析、核心组件说明、调度流程详解等内容。 + +## 🏗️ 整体架构对比 + +### Java版本架构 +``` +Manager调度器 → 一致性哈希 → 网络通信 → Collector + ↓ +TimerDispatcher → WheelTimerTask → CommonDispatcher + ↓ +MetricsCollectorQueue → WorkerPool → MetricsCollect + ↓ +CollectStrategyFactory(SPI) → 具体采集器 → 返回结果 +``` + +### Go版本架构 +``` +Manager调度器 → 网络通信(待完善) → CollectServer + ↓ +TimerDispatcher → WheelTimerTask → DispatchMetricsTask + ↓ +WorkerPool → MetricsCollect → CollectService + ↓ +CollectorRegistry(手动注册) → 具体采集器 → 返回结果 +``` + +## 🔄 调度流程详细对比 + +### 1. 任务接收阶段 + +#### Java版本 +```java +// Manager发送任务 +ClusterMsg.Message message = ClusterMsg.Message.newBuilder() + .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK) + .setDirection(ClusterMsg.Direction.REQUEST) + .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job))) + .build(); +manageServer.sendMsg(node.getIdentity(), message); + +// Collector接收任务 +@Override +public void response(ClusterMsg.Message message) { + Job job = JsonUtil.fromJson(message.getMsg().toStringUtf8(), Job.class); + collectJobService.addAsyncCollectJob(job); +} +``` + +#### Go版本 +```go +// Manager发送任务 (目前为模拟方式) +func (cs *CollectServer) ReceiveJob(job *jobtypes.Job, eventListener timer.CollectResponseEventListener) error { + if !cs.isStarted { + return fmt.Errorf("collect server is not started") + } + + // 直接添加到调度器 + return cs.timerDispatcher.AddJob(job, eventListener) +} +``` + +**主要差异**: +- Java: 完整的网络通信协议栈 +- Go: 网络层待完善,目前使用模拟接口 + +### 2. 任务调度阶段 + +#### Java版本 +```java +// TimerDispatcher调度 +@Override +public void addJob(Job addJob, CollectResponseEventListener eventListener) { + WheelTimerTask timerJob = new WheelTimerTask(addJob); + if (addJob.isCyclic()) { + Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS); + currentCyclicTaskMap.put(addJob.getId(), timeout); + } +} + +// WheelTimerTask执行 +@Override +public void run(Timeout timeout) throws Exception { + // 分发到CommonDispatcher + metricsTaskDispatch.dispatchMetricsTask(this); +} +``` + +#### Go版本 +```go +// TimerDispatcher调度 +func (td *TimerDispatcher) AddJob(job *jobtypes.Job, eventListener CollectResponseEventListener) error { + timerTask := NewWheelTimerTask(job, td.metricsDispatcher, td.logger) + + var delay time.Duration + if job.DefaultInterval > 0 { + delay = time.Duration(job.DefaultInterval) * time.Second + } + + timeout := td.wheelTimer.NewTimeout(timerTask, delay) + + if job.IsCyclic { + td.cyclicTasks.Store(job.ID, timeout) + } else { + td.tempTasks.Store(job.ID, timeout) + } +} + +// DispatchMetricsTask执行 +func (td *TimerDispatcher) DispatchMetricsTask(timeout *jobtypes.Timeout) error { + if task, ok := timeout.Task().(*WheelTimerTask); ok { + job := task.GetJob() + + // 为每个metric创建采集任务 + for _, metric := range job.Metrics { + metricsCollect := worker.NewMetricsCollect( + &metric, timeout, td.collectDispatcher, + "collector-go", td.collectService, td.logger, + ) + td.workerPool.Submit(metricsCollect) + } + } +} +``` + +**主要差异**: +- Java: 使用CommonDispatcher作为中间层 +- Go: TimerDispatcher直接分发任务到WorkerPool + +### 3. 任务分发阶段 + +#### Java版本 +```java +// CommonDispatcher处理 +public void run() { + while (!Thread.currentThread().isInterrupted()) { + MetricsCollect metricsCollect = jobRequestQueue.getJob(); + if (metricsCollect != null) { + workerPool.executeJob(metricsCollect); + } + } +} + +// 使用优先级队列 +private final MetricsCollectorQueue jobRequestQueue; +``` + +#### Go版本 +```go +// WorkerPool直接处理 +func (wp *WorkerPool) Submit(task Task) error { + select { + case wp.taskQueue <- task: + atomic.AddInt64(&wp.stats.QueuedTasks, 1) + return nil + default: + return fmt.Errorf("task queue is full") + } +} + +// Worker执行任务 +func (w *worker) executeTask(task Task) { + timeout := task.Timeout() + ctx, cancel := context.WithTimeout(w.ctx, timeout) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- task.Execute() + }() + + select { + case err := <-done: + // 任务完成 + case <-ctx.Done(): + // 任务超时 + } +} +``` + +**主要差异**: +- Java: 专门的优先级队列 + 分发线程 +- Go: 直接的通道队列 + Goroutine池 + +### 4. 采集器调用阶段 + +#### Java版本 (SPI自动发现) +```java +// CollectStrategyFactory使用SPI +@Override +public void run(String... args) throws Exception { + ServiceLoader<AbstractCollect> loader = ServiceLoader.load(AbstractCollect.class); + for (AbstractCollect collect : loader) { + COLLECT_STRATEGY.put(collect.supportProtocol(), collect); + } +} + +// META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect +org.apache.hertzbeat.collector.collect.database.JdbcCommonCollect +org.apache.hertzbeat.collector.collect.http.HttpCollectImpl +// ... 其他采集器 +``` + +#### Go版本 (手动注册 + 新的自动注册机制) +```go +// 传统手动注册方式 +func RegisterBuiltinCollectors(service *CollectService, logger logger.Logger) { + jdbcCollector := database.NewJDBCCollector(logger) + service.RegisterCollector(jdbcCollector.SupportProtocol(), jdbcCollector) +} + +// 新的自动注册机制 +// pkg/collector/basic/database/jdbc_auto_register.go +func init() { + registry.RegisterCollectorFactory( + "jdbc", + func(logger logger.Logger) basic.AbstractCollector { + return NewJDBCCollector(logger) + }, + registry.WithPriority(10), + ) +} +``` + +**主要差异**: +- Java: 使用Java SPI机制自动发现 +- Go: 使用init()函数 + 注册中心的自动注册机制 + +## 🔧 核心组件详解 + +### 1. TimerDispatcher (时间轮调度器) + +#### 共同特性 +- 都使用HashedWheelTimer实现 +- 支持循环任务和一次性任务 +- 任务超时管理 + +#### 差异对比 +| 特性 | Java版本 | Go版本 | +|------|---------|--------| +| 超时监控 | 独立的ScheduledExecutor | 集成在TimerDispatcher中 | +| 任务存储 | ConcurrentHashMap | sync.Map | +| 重试机制 | 基于优先级队列 | 指数退避策略 | +| 事件监听 | EventListener接口 | CollectResponseEventListener | + +### 2. WorkerPool (工作线程池) + +#### Java版本特性 +```java +// 基于ThreadPoolExecutor +private ThreadPoolExecutor workerExecutor; + +// 动态线程池配置 +int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors()); +int maxSize = Runtime.getRuntime().availableProcessors() * 16; +workerExecutor = new ThreadPoolExecutor(coreSize, maxSize, 10, TimeUnit.SECONDS, + new SynchronousQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy()); +``` + +#### Go版本特性 +```go +// 基于Goroutine池 +type WorkerPool struct { + config WorkerPoolConfig + taskQueue chan Task + workers sync.Map + ctx context.Context + cancel context.CancelFunc +} + +// 动态Goroutine管理 +func (wp *WorkerPool) adjustWorkerCount() { + queuedTasks := atomic.LoadInt64(&wp.stats.QueuedTasks) + currentWorkers := wp.workerCount.Load() + + if queuedTasks > int64(currentWorkers)*2 && currentWorkers < int32(wp.config.MaxSize) { + wp.addWorker(false) // 添加非核心worker + } +} +``` + +#### 差异对比 +| 特性 | Java版本 | Go版本 | +|------|---------|--------| +| 并发模型 | 线程池 | Goroutine池 | +| 队列类型 | SynchronousQueue | 带缓冲的Channel | +| 拒绝策略 | AbortPolicy | 阻塞等待 | +| 资源管理 | JVM线程管理 | Go运行时管理 | + +### 3. 采集器注册机制 + +#### Java版本 (SPI机制) +```java +// 优点: +- 自动发现,无需手动注册 +- 标准Java机制,成熟稳定 +- 支持插件化扩展 + +// 缺点: +- 启动时全量加载 +- 难以动态控制 +- 依赖类路径配置 +``` + +#### Go版本 (注册中心机制) +```go +// 优点: +- 支持优先级控制 +- 运行时动态启用/禁用 +- 避免循环依赖 +- 更好的错误处理 + +// 缺点: +- 需要手动导入包 +- 相对复杂的注册流程 +``` + +## 📊 性能对比分析 + +### 1. 内存使用 + +#### Java版本 +- **优势**: JVM成熟的内存管理 +- **劣势**: 较大的基础内存占用 +- **特点**: GC压力,但有成熟的调优工具 + +#### Go版本 +- **优势**: 更小的内存占用 +- **劣势**: 需要手动管理某些资源 +- **特点**: GC延迟低,内存效率高 + +### 2. 并发性能 + +#### Java版本 +- **线程开销**: 每个线程约2MB栈空间 +- **上下文切换**: 相对较重 +- **适用场景**: CPU密集型任务 + +#### Go版本 +- **Goroutine开销**: 每个约2KB栈空间 +- **上下文切换**: 非常轻量 +- **适用场景**: IO密集型任务 + +### 3. 启动速度 + +#### Java版本 +- **JVM启动**: 相对较慢 +- **类加载**: SPI机制需要扫描classpath +- **预热时间**: JIT编译需要时间 + +#### Go版本 +- **编译启动**: 非常快速 +- **静态链接**: 无需额外依赖 +- **即时性能**: 无需预热 + +## 🔮 发展路线图 + +### 短期目标 (已完成) +- ✅ 基础调度框架 +- ✅ JDBC采集器实现 +- ✅ 自动注册机制 +- ✅ 超时监控和重试 + +### 中期目标 (进行中) +- 🔄 完善网络通信层 +- 🔄 实现更多协议采集器 (HTTP, SSH, SNMP等) +- 🔄 集群模式支持 +- 🔄 配置文件驱动的采集器管理 + +### 长期目标 (规划中) +- 📋 插件化架构 +- 📋 热更新能力 +- 📋 云原生部署支持 +- 📋 AI驱动的智能调度 + +## 🛠️ 开发指南 + +### 1. 添加新的采集器 + +```go +// 步骤1: 实现采集器接口 +type RedisCollector struct { + logger logger.Logger +} + +func (rc *RedisCollector) SupportProtocol() string { + return "redis" +} + +func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData { + // 实现采集逻辑 +} + +// 步骤2: 添加自动注册 +func init() { + registry.RegisterCollectorFactory("redis", + func(logger logger.Logger) basic.AbstractCollector { + return NewRedisCollector(logger) + }, + registry.WithPriority(15), + ) +} + +// 步骤3: 在registry.go中添加导入 +_ "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/redis" +``` + +### 2. 调试和监控 + +```go +// 查看调度器状态 +stats := timerDispatcher.GetStats() +logger.Info("调度器状态", + "cyclicJobs", stats.CyclicJobs, + "tempJobs", stats.TempJobs, + "executedJobs", stats.ExecutedJobs) + +// 查看工作池状态 +poolStats, queueStats := workerPool.GetStats() +logger.Info("工作池状态", + "activeWorkers", poolStats.ActiveWorkers, + "queuedTasks", poolStats.QueuedTasks) +``` + +### 3. 性能优化建议 + +#### 调度器优化 +```go +// 1. 合理设置时间轮参数 +timerDispatcher := timer.NewTimerDispatcher(logger) +timerDispatcher.SetWheelSize(512) // 根据任务数量调整 +timerDispatcher.SetTickDuration(100 * time.Millisecond) + +// 2. 批量处理任务 +func (td *TimerDispatcher) BatchDispatch(jobs []*jobtypes.Job) error { + for _, job := range jobs { + td.AddJob(job, nil) + } +} +``` + +#### 工作池优化 +```go +// 1. 根据任务特性调整池大小 +config := worker.WorkerPoolConfig{ + CoreSize: runtime.NumCPU(), // CPU密集型 + MaxSize: runtime.NumCPU() * 4, // IO密集型可以更大 + QueueSize: 1000, // 根据内存情况调整 + IdleTimeout: 60 * time.Second, +} + +// 2. 任务优先级处理 +type PriorityTask struct { + Priority int + Task Task +} +``` + +## 📝 最佳实践 + +### 1. 错误处理 +```go +// 采集器中的错误处理 +func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData { + defer func() { + if r := recover(); r != nil { + jc.logger.Error(fmt.Errorf("panic in JDBC collect: %v", r), "采集器panic") + } + }() + + // 具体采集逻辑... +} +``` + +### 2. 资源管理 +```go +// 连接池管理 +type JDBCCollector struct { + connectionPool map[string]*sql.DB + poolMutex sync.RWMutex +} + +func (jc *JDBCCollector) getConnection(dsn string) (*sql.DB, error) { + jc.poolMutex.RLock() + if conn, exists := jc.connectionPool[dsn]; exists { + jc.poolMutex.RUnlock() + return conn, nil + } + jc.poolMutex.RUnlock() + + // 创建新连接... +} +``` + +### 3. 配置管理 +```go +// 支持配置文件和环境变量 +type CollectorConfig struct { + JDBC struct { + MaxConnections int `yaml:"max_connections" env:"JDBC_MAX_CONNECTIONS"` + ConnectionTimeout time.Duration `yaml:"connection_timeout" env:"JDBC_CONNECTION_TIMEOUT"` + QueryTimeout time.Duration `yaml:"query_timeout" env:"JDBC_QUERY_TIMEOUT"` + } `yaml:"jdbc"` +} +``` + +## 🎯 总结 + +HertzBeat Go版本的调度架构在保持与Java版本核心理念一致的同时,充分利用了Go语言的特性优势: + +### 核心优势 +1. **高并发性能**: Goroutine模型适合IO密集的采集任务 +2. **低资源占用**: 更小的内存占用和更快的启动速度 +3. **简洁架构**: 减少了中间层,调度链路更直接 +4. **类型安全**: 编译时类型检查,减少运行时错误 + +### 主要差异 +1. **网络层**: 待完善,目前使用模拟接口 +2. **采集器注册**: 从SPI改为init()函数 + 注册中心 +3. **并发模型**: 从线程池改为Goroutine池 +4. **错误处理**: 更显式的错误处理机制 + +### 发展方向 +Go版本将在保持高性能和低资源占用优势的基础上,逐步完善网络通信、集群支持等企业级特性,最终实现与Java版本功能对等但性能更优的目标。 + +--- + +*文档版本: v1.0* +*最后更新: 2024年1月* +*维护者: HertzBeat Go Team* diff --git a/examples/main_simulation.go b/examples/main_simulation.go new file mode 100644 index 0000000..2756b22 --- /dev/null +++ b/examples/main_simulation.go @@ -0,0 +1,289 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/dispatcher/entrance" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" +) + +// go build -ldflags "-X main.Version=x.y.z" +var ( + ConfPath string + Version string + Simulation bool // 新增:是否启用模拟模式 +) + +func init() { + flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to config file") + flag.BoolVar(&Simulation, "simulation", true, "enable manager task simulation mode") +} + +func main() { + flag.Parse() + + // 初始化日志 + log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo) + log.Info("🚀 启动HertzBeat Collector", "version", Version, "simulation", Simulation) + Simulation = true + if Simulation { + // 模拟模式:启动完整的CollectServer并模拟Manager发送任务 + runSimulationMode(log) + } else { + // 正常模式:启动标准的collector服务 + if err := collector.Bootstrap(ConfPath, Version); err != nil { + log.Error(err, "❌ 启动collector失败") + os.Exit(1) + } + } +} + +// runSimulationMode 运行模拟Manager发送任务的模式 +func runSimulationMode(log logger.Logger) { + log.Info("🎭 启动模拟模式:模拟Manager发送JDBC采集任务") + + // 1. 创建CollectServer (完整的采集器架构) + config := entrance.DefaultServerConfig() + collectServer, err := entrance.NewCollectServer(config, log) + if err != nil { + log.Error(err, "❌ 创建CollectServer失败") + return + } + + // 2. 启动CollectServer + if err := collectServer.Start(); err != nil { + log.Error(err, "❌ 启动CollectServer失败") + return + } + defer collectServer.Stop() + + log.Info("✅ CollectServer已启动,准备接收Manager任务") + + // 3. 创建模拟的Manager任务 + jdbcJob := createManagerJDBCTask() + + // 4. 启动任务模拟器 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 启动模拟Manager发送任务的goroutine + go simulateManagerTasks(ctx, collectServer, jdbcJob, log) + + // 5. 等待中断信号 + log.Info("🔄 模拟器运行中 (每60秒模拟Manager发送任务),按Ctrl+C停止...") + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + log.Info("📴 收到停止信号,正在关闭...") + cancel() + time.Sleep(2 * time.Second) + log.Info("👋 模拟器已停止") +} + +// createManagerJDBCTask 创建模拟从Manager接收的JDBC采集任务 +func createManagerJDBCTask() *jobtypes.Job { + return &jobtypes.Job{ + ID: 1001, + TenantID: 1, + MonitorID: 2001, + App: "mysql", + Category: "database", + IsCyclic: true, + DefaultInterval: 30, // 30秒间隔 + Timestamp: time.Now().Unix(), + + // Manager发送的任务元数据 + Metadata: map[string]string{ + "instancename": "生产MySQL实例", + "instancehost": "localhost:43306", + "manager_node": "hertzbeat-manager-001", + "task_source": "manager_scheduler", + "collector_assigned": "auto", + }, + + Labels: map[string]string{ + "env": "production", + "database": "mysql", + "region": "localhost", + "managed_by": "hertzbeat", + "priority": "high", + }, + + Annotations: map[string]string{ + "description": "MySQL数据库性能监控", + "created_by": "manager-scheduler", + "task_type": "cyclic_monitoring", + "alert_enabled": "true", + }, + + // JDBC采集指标配置 + Metrics: []jobtypes.Metrics{ + { + Name: "mysql_basic_info", + Priority: 0, + Protocol: "jdbc", + Host: "localhost", + Port: "3306", + Timeout: "15s", + Interval: 30, + Fields: []jobtypes.Field{ + {Field: "database_name", Type: 1, Label: true}, + {Field: "version", Type: 1, Label: false}, + {Field: "uptime_seconds", Type: 0, Label: false}, + {Field: "server_id", Type: 0, Label: false}, + }, + JDBC: &jobtypes.JDBCProtocol{ + Host: "localhost", + Port: "3306", + Platform: "mysql", + Username: "root", + Password: "password", // 请修改为实际密码 + Database: "mysql", + Timeout: "15", + QueryType: "oneRow", + SQL: `SELECT + DATABASE() as database_name, + VERSION() as version, + (SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME='Uptime') as uptime_seconds, + @@server_id as server_id`, + }, + }, + }, + } +} + +// simulateManagerTasks 模拟Manager定期发送采集任务 +func simulateManagerTasks(ctx context.Context, collectServer *entrance.CollectServer, job *jobtypes.Job, log logger.Logger) { + log.Info("📡 开始模拟Manager任务调度", "jobId", job.ID, "interval", "60s") + + // 创建任务响应监听器 (模拟发送结果回Manager) + eventListener := &ManagerResponseSimulator{ + logger: log, + jobID: job.ID, + monitorID: job.MonitorID, + } + + // 立即发送第一个任务 (模拟Manager初始调度) + log.Info("📤 Manager发送初始任务", "timestamp", time.Now().Format("15:04:05")) + if err := collectServer.ReceiveJob(job, eventListener); err != nil { + log.Error(err, "❌ 接收初始任务失败") + return + } + + // 模拟Manager定期发送任务更新/重新调度 + ticker := time.NewTicker(60 * time.Second) // 每60秒模拟一次Manager调度 + defer ticker.Stop() + + taskSequence := 1 + + for { + select { + case <-ticker.C: + taskSequence++ + + // 创建任务更新 (模拟Manager重新调度或配置更新) + updatedJob := *job + updatedJob.Timestamp = time.Now().Unix() + updatedJob.ID = job.ID + int64(taskSequence) + + // 更新任务元数据 (模拟Manager的管理信息) + updatedJob.Metadata["task_sequence"] = fmt.Sprintf("%d", taskSequence) + updatedJob.Metadata["last_scheduled"] = time.Now().Format(time.RFC3339) + updatedJob.Metadata["scheduler_version"] = "v1.0.0" + + log.Info("📤 Manager发送任务更新", + "sequence", taskSequence, + "jobId", updatedJob.ID, + "timestamp", time.Now().Format("15:04:05")) + + // 发送更新任务到Collector + if err := collectServer.ReceiveJob(&updatedJob, eventListener); err != nil { + log.Error(err, "❌ 接收任务更新失败", "sequence", taskSequence) + } + + case <-ctx.Done(): + log.Info("🛑 停止模拟Manager任务调度") + return + } + } +} + +// ManagerResponseSimulator 模拟Manager接收采集结果的响应处理器 +type ManagerResponseSimulator struct { + logger logger.Logger + jobID int64 + monitorID int64 +} + +// Response 处理采集结果 (模拟发送给Manager的过程) +func (mrs *ManagerResponseSimulator) Response(metricsData []interface{}) { + mrs.logger.Info("📨 Collector采集完成,准备发送结果到Manager", + "jobId", mrs.jobID, + "monitorId", mrs.monitorID, + "metricsCount", len(metricsData)) + + // 模拟处理每个采集指标的结果 + for i, data := range metricsData { + if collectData, ok := data.(*jobtypes.CollectRepMetricsData); ok { + mrs.logger.Info("📊 处理采集指标", + "metricIndex", i+1, + "metricName", collectData.Metrics, + "code", collectData.Code, + "time", collectData.Time) + + if collectData.Code == 200 { + mrs.logger.Info("✅ 指标采集成功", + "metric", collectData.Metrics, + "fieldsCount", len(collectData.Fields), + "valuesCount", len(collectData.Values)) + + // 打印采集数据 (模拟发送到Manager的数据) + for rowIndex, valueRow := range collectData.Values { + mrs.logger.Info("📈 采集数据", + "metric", collectData.Metrics, + "row", rowIndex+1, + "values", valueRow.Columns) + } + } else { + mrs.logger.Error(fmt.Errorf("采集失败"), "❌ 指标采集错误", + "metric", collectData.Metrics, + "code", collectData.Code, + "message", collectData.Msg) + } + } + } + + // 模拟发送HTTP响应到Manager + mrs.simulateHTTPResponseToManager(metricsData) +} + +// simulateHTTPResponseToManager 模拟通过HTTP将采集结果发送回Manager +func (mrs *ManagerResponseSimulator) simulateHTTPResponseToManager(metricsData []interface{}) { + // 模拟HTTP请求参数 + managerEndpoint := "http://hertzbeat-manager:1157/api/collector/collect" + collectorIdentity := "collector-go-001" + + mrs.logger.Info("🚀 模拟HTTP请求发送采集结果", + "endpoint", managerEndpoint, + "collectorId", collectorIdentity, + "jobId", mrs.jobID, + "dataCount", len(metricsData), + "timestamp", time.Now().Format("15:04:05")) + + // 这里可以添加真实的HTTP客户端代码 + // httpClient.Post(managerEndpoint, jsonData) + + mrs.logger.Info("📤 采集结果已发送到Manager (模拟成功)", + "status", "200 OK", + "responseTime", "15ms") +} diff --git a/pkg/banner/embed.go b/pkg/banner/embed.go index 0da67df..cf4c31e 100644 --- a/pkg/banner/embed.go +++ b/pkg/banner/embed.go @@ -5,19 +5,23 @@ import ( "os" "strconv" "text/template" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server" ) //go:embed banner.txt var EmbedLogo embed.FS +// LoggerInterface defines the interface for logging +type LoggerInterface interface { + Error(err error, msg string) + Info(msg string, keysAndValues ...interface{}) +} + type Banner struct { - server *server.CollectorServer + logger LoggerInterface } -func New(server *server.CollectorServer) *Banner { - return &Banner{server: server} +func New(logger LoggerInterface) *Banner { + return &Banner{logger: logger} } type bannerVars struct { @@ -28,15 +32,19 @@ type bannerVars struct { } func (b *Banner) PrintBanner(appName, port string) error { + return b.PrintBannerWithVersion(appName, port, "unknown") +} + +func (b *Banner) PrintBannerWithVersion(appName, port, version string) error { data, err := EmbedLogo.ReadFile("banner.txt") if err != nil { - b.server.Logger.Error(err, "read banner file failed") + b.logger.Error(err, "read banner file failed") return err } tmpl, err := template.New("banner").Parse(string(data)) if err != nil { - b.server.Logger.Error(err, "parse banner template failed") + b.logger.Error(err, "parse banner template failed") return err } @@ -44,7 +52,7 @@ func (b *Banner) PrintBanner(appName, port string) error { CollectorName: appName, ServerPort: port, Pid: strconv.Itoa(os.Getpid()), - Version: b.server.Version, + Version: version, } err = tmpl.Execute(os.Stdout, vars) diff --git a/pkg/collector/basic/database/jdbc_auto_register.go b/pkg/collector/basic/database/jdbc_auto_register.go new file mode 100644 index 0000000..9eac413 --- /dev/null +++ b/pkg/collector/basic/database/jdbc_auto_register.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package database + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/registry" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" +) + +// init 函数会在包被导入时自动执行,完成JDBC采集器的自动注册 +func init() { + // 注册JDBC采集器工厂函数到全局注册中心 + registry.RegisterCollectorFactory( + "jdbc", // 协议名称 + func(logger logger.Logger) basic.AbstractCollector { + return NewJDBCCollector(logger) + }, + registry.WithPriority(10), // 高优先级,数据库采集很重要 + ) +} diff --git a/pkg/collector/bootstrap.go b/pkg/collector/bootstrap.go index 999f9fe..024b0a9 100644 --- a/pkg/collector/bootstrap.go +++ b/pkg/collector/bootstrap.go @@ -7,47 +7,27 @@ import ( "syscall" "hertzbeat.apache.org/hertzbeat-collector-go/pkg/banner" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/config" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" ) func Bootstrap(confPath, version string) error { + // Initialize logger + log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo) + log.Info("starting HertzBeat Collector Go", "version", version) - // Init collector server - cs := server.NewCollectorServer(version) - - // Load HertzBeat collector config - loader := config.New(confPath, cs, nil) - cfg, err := loader.LoadConfig() - if err != nil { - cs.Logger.Error(err, "load collector config failed") - return err - } - err = loader.ValidateConfig(cfg) - if err != nil { - cs.Logger.Error(err, "validate collector config failed") - return err - } - - // todo: optimize log init eg. dynamic update log level - - // render banner - err = banner.New(cs).PrintBanner(cfg.Collector.Info.Name, cfg.Collector.Info.Port) + // Print banner + bannerPrinter := banner.New(&BannerAdapter{logger: log}) + err := bannerPrinter.PrintBannerWithVersion("HertzBeat-Collector-Go", "1159", version) if err != nil { - cs.Logger.Error(err, "print banner failed") + log.Error(err, "failed to print banner") return err } - // Load collector job - - // check collector server - err = cs.Validate() - if err != nil { - cs.Logger.Error(err, "validate collector server failed") - return err - } + // Create and initialize the collector application + app := NewCollectorApp(confPath, version, log) - // Start collector server + // Setup graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -55,17 +35,82 @@ func Bootstrap(confPath, version string) error { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh + log.Info("received shutdown signal") cancel() }() - err = cs.Start(ctx) - if err != nil { - cs.Logger.Error(err, "start collector server failed") + // Start the application + if err := app.Start(); err != nil { + log.Error(err, "failed to start collector application") + return err + } + + // Wait for shutdown signal + <-ctx.Done() + + // Graceful shutdown + log.Info("shutting down collector application") + if err := app.Stop(); err != nil { + log.Error(err, "error during shutdown") return err } - // shutdown collector server - _ = cs.Close() + log.Info("HertzBeat Collector Go stopped successfully") + return nil +} + +// CollectorApp wraps the complete collector application +type CollectorApp struct { + confPath string + version string + logger logger.Logger + + // Core components + collectService *CollectService +} + +// NewCollectorApp creates a new collector application +func NewCollectorApp(confPath, version string, logger logger.Logger) *CollectorApp { + return &CollectorApp{ + confPath: confPath, + version: version, + logger: logger.WithName("collector-app"), + } +} + +// Start starts the collector application +func (app *CollectorApp) Start() error { + app.logger.Info("initializing collector application", "version", app.version) + + // Initialize collect service with enhanced scheduling + app.collectService = NewCollectService(app.logger) + + // Register built-in collectors + RegisterBuiltinCollectors(app.collectService, app.logger) + + app.logger.Info("collector application started successfully") + return nil +} + +// Stop stops the collector application +func (app *CollectorApp) Stop() error { + app.logger.Info("stopping collector application") + // TODO: Implement graceful shutdown of components + + app.logger.Info("collector application stopped successfully") return nil } + +// BannerAdapter adapts logger interface for banner printer +type BannerAdapter struct { + logger logger.Logger +} + +func (ba *BannerAdapter) Error(err error, msg string) { + ba.logger.Error(err, msg) +} + +func (ba *BannerAdapter) Info(msg string, keysAndValues ...interface{}) { + ba.logger.Info(msg, keysAndValues...) +} diff --git a/pkg/collector/collect_service.go b/pkg/collector/collect_service.go index 0f970be..9862b03 100644 --- a/pkg/collector/collect_service.go +++ b/pkg/collector/collect_service.go @@ -2,23 +2,30 @@ package collector import ( "fmt" + "time" "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer" "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" ) // CollectService manages all collectors and provides a unified interface type CollectService struct { - registry *basic.CollectorRegistry - logger logger.Logger + registry *basic.CollectorRegistry + logger logger.Logger + timerDispatcher *timer.TimerDispatcher // Optional: for advanced task management + defaultPriority int // Default task priority + timeoutRetries int // Default retry count for timeouts } // NewCollectService creates a new collect service func NewCollectService(logger logger.Logger) *CollectService { service := &CollectService{ - registry: basic.NewCollectorRegistry(logger), - logger: logger.WithName("collect-service"), + registry: basic.NewCollectorRegistry(logger), + logger: logger.WithName("collect-service"), + defaultPriority: 5, // Medium priority (1-10 scale) + timeoutRetries: 3, // Default retry count } // Note: Collectors should be registered externally to avoid circular dependencies @@ -27,6 +34,32 @@ func NewCollectService(logger logger.Logger) *CollectService { return service } +// SetTimerDispatcher sets the timer dispatcher for advanced task management +func (cs *CollectService) SetTimerDispatcher(dispatcher *timer.TimerDispatcher) { + cs.timerDispatcher = dispatcher + cs.logger.Info("timer dispatcher configured for collect service") +} + +// SetDefaultPriority sets the default priority for collection tasks +func (cs *CollectService) SetDefaultPriority(priority int) { + if priority < 1 || priority > 10 { + cs.logger.Info("invalid priority, using default", "provided", priority, "default", cs.defaultPriority) + return + } + cs.defaultPriority = priority + cs.logger.Info("default priority updated", "priority", priority) +} + +// SetTimeoutRetries sets the default retry count for timeout handling +func (cs *CollectService) SetTimeoutRetries(retries int) { + if retries < 0 || retries > 10 { + cs.logger.Info("invalid retry count, using default", "provided", retries, "default", cs.timeoutRetries) + return + } + cs.timeoutRetries = retries + cs.logger.Info("timeout retries updated", "retries", retries) +} + // RegisterCollector registers a collector with the service func (cs *CollectService) RegisterCollector(protocol string, collector basic.AbstractCollector) { cs.registry.Register(protocol, collector) @@ -100,8 +133,209 @@ func (cs *CollectService) GetCollector(protocol string) (basic.AbstractCollector // DispatchMetricsTask implements MetricsTaskDispatcher interface func (cs *CollectService) DispatchMetricsTask(timeout *jobtypes.Timeout) error { - cs.logger.Info("dispatching metrics task", "timeout", timeout) - // TODO: Implement actual metrics task dispatching logic - // This would typically involve creating MetricsCollect tasks and submitting them to a worker pool + if timeout == nil { + cs.logger.Error(fmt.Errorf("timeout is nil"), "failed to dispatch metrics task") + return fmt.Errorf("timeout cannot be nil") + } + + cs.logger.Info("dispatching metrics task") + + // Extract job information from timeout task + var job *jobtypes.Job + if wheelTask, ok := timeout.Task().(*timer.WheelTimerTask); ok { + job = wheelTask.GetJob() + } else { + cs.logger.Error(fmt.Errorf("timeout task is not a WheelTimerTask"), "failed to dispatch metrics task") + return fmt.Errorf("timeout task must be a WheelTimerTask") + } + + if job == nil { + cs.logger.Error(fmt.Errorf("job is nil"), "failed to dispatch metrics task") + return fmt.Errorf("job cannot be nil") + } + + // Get metrics to collect for this job + metricsSet := job.GetNextCollectMetrics() + if len(metricsSet) == 0 { + cs.logger.Info("no metrics to collect for job", "job_id", job.ID, "app", job.App) + return nil + } + + cs.logger.Info("creating metrics collection tasks", + "job_id", job.ID, + "app", job.App, + "metrics_count", len(metricsSet)) + + // Create and submit collection tasks for each metrics + var lastError error + successCount := 0 + + for _, metrics := range metricsSet { + if err := cs.createAndSubmitTask(metrics, timeout, job); err != nil { + cs.logger.Error(err, "failed to create metrics collection task", + "metrics", metrics.Name, + "protocol", metrics.Protocol) + lastError = err + } else { + successCount++ + } + } + + cs.logger.Info("metrics tasks dispatched", + "total", len(metricsSet), + "success", successCount, + "failed", len(metricsSet)-successCount) + + // Return error only if all tasks failed + if successCount == 0 && lastError != nil { + return fmt.Errorf("all metrics tasks failed: %w", lastError) + } + + return nil +} + +// createAndSubmitTask creates a metrics collection task and submits it to the worker pool +func (cs *CollectService) createAndSubmitTask(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout, job *jobtypes.Job) error { + // Validate metrics + if metrics == nil { + return fmt.Errorf("metrics is nil") + } + + // Check if we have a collector for this protocol + if _, exists := cs.registry.GetCollector(metrics.Protocol); !exists { + return fmt.Errorf("no collector found for protocol: %s", metrics.Protocol) + } + + cs.logger.Info("creating metrics collection task", + "metrics", metrics.Name, + "protocol", metrics.Protocol, + "host", metrics.Host, + "port", metrics.Port) + + // Calculate task priority based on metrics configuration + taskPriority := cs.calculateTaskPriority(metrics, job) + + // Add timeout monitoring if timer dispatcher is available + if cs.timerDispatcher != nil { + cs.timerDispatcher.AddMetricsTimeout(metrics, timeout) + defer cs.timerDispatcher.RemoveMetricsTimeout(metrics, timeout) + } + + // Perform collection with priority and timeout handling + start := time.Now() + result := cs.collectWithPriority(metrics, timeout, taskPriority) + duration := time.Since(start) + + if result == nil { + return fmt.Errorf("collection returned nil result") + } + + cs.logger.Info("metrics collection completed", + "metrics", metrics.Name, + "protocol", metrics.Protocol, + "priority", taskPriority, + "duration", duration, + "code", result.Code, + "message", result.Msg) + + // TODO: Dispatch collected data to appropriate handlers + // This would typically involve calling collectDataDispatch.dispatchCollectData() + return nil } + +// calculateTaskPriority calculates the priority for a metrics collection task +func (cs *CollectService) calculateTaskPriority(metrics *jobtypes.Metrics, job *jobtypes.Job) int { + priority := cs.defaultPriority + + // Adjust priority based on metrics configuration + if metrics.Priority > 0 { + priority = metrics.Priority + } + + // Adjust priority based on protocol criticality + switch metrics.Protocol { + case "icmp", "ping": + priority += 2 // High priority for availability checks + case "http", "https": + priority += 1 // Medium-high priority for web services + case "snmp": + priority -= 1 // Lower priority for SNMP (usually less time-sensitive) + } + + // Ensure priority stays within valid range + if priority < 1 { + priority = 1 + } else if priority > 10 { + priority = 10 + } + + return priority +} + +// collectWithPriority performs metrics collection with priority and timeout handling +func (cs *CollectService) collectWithPriority(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout, priority int) *jobtypes.CollectRepMetricsData { + // Create channel for result + resultChan := make(chan *jobtypes.CollectRepMetricsData, 1) + + // Calculate timeout duration based on priority (higher priority gets more time) + baseTimeout := 60 * time.Second + if metrics.Timeout != "" { + if duration, err := time.ParseDuration(metrics.Timeout); err == nil { + baseTimeout = duration + } + } + + // Adjust timeout based on priority + timeoutDuration := baseTimeout + if priority >= 8 { + timeoutDuration = baseTimeout + (baseTimeout / 2) // 50% more time for high priority + } else if priority <= 3 { + timeoutDuration = baseTimeout - (baseTimeout / 4) // 25% less time for low priority + } + + cs.logger.Info("starting prioritized collection", + "metrics", metrics.Name, + "protocol", metrics.Protocol, + "priority", priority, + "timeout", timeoutDuration) + + // Perform collection in goroutine + go func() { + defer func() { + if r := recover(); r != nil { + cs.logger.Error(fmt.Errorf("panic in collection: %v", r), "metrics collection panicked", + "metrics", metrics.Name, "protocol", metrics.Protocol, "priority", priority) + resultChan <- &jobtypes.CollectRepMetricsData{ + Code: 500, + Msg: fmt.Sprintf("collection panicked: %v", r), + } + } + }() + + // Add priority-based delay for lower priority tasks (simple congestion control) + if priority <= 3 { + time.Sleep(time.Duration(5-priority) * 100 * time.Millisecond) + } + + result := cs.Collect(metrics) + resultChan <- result + }() + + // Wait for result or timeout + select { + case result := <-resultChan: + return result + case <-time.After(timeoutDuration): + cs.logger.Info("metrics collection timed out", + "metrics", metrics.Name, + "protocol", metrics.Protocol, + "priority", priority, + "timeout", timeoutDuration) + + return &jobtypes.CollectRepMetricsData{ + Code: 504, // Gateway Timeout + Msg: fmt.Sprintf("collection timed out after %v (priority: %d)", timeoutDuration, priority), + } + } +} diff --git a/pkg/collector/common/dispatcher/entrance/collect_server.go b/pkg/collector/common/dispatcher/entrance/collect_server.go index c657f81..8e66e87 100644 --- a/pkg/collector/common/dispatcher/entrance/collect_server.go +++ b/pkg/collector/common/dispatcher/entrance/collect_server.go @@ -5,9 +5,11 @@ import ( "sync" "time" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer" "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/worker" "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" ) // CollectServer manages the complete collection infrastructure including @@ -20,6 +22,7 @@ type CollectServer struct { timerDispatcher *timer.TimerDispatcher workerPool *worker.WorkerPool collectJobService *CollectJobService + collectService *collector.CollectService networkClient *NetworkClient // Component states @@ -138,6 +141,11 @@ func (cs *CollectServer) initializeComponents() error { // Initialize timer dispatcher cs.timerDispatcher = timer.NewTimerDispatcher(cs.logger) + // Initialize collect service with enhanced scheduling capabilities + cs.collectService = collector.NewCollectService(cs.logger) + cs.collectService.SetTimerDispatcher(cs.timerDispatcher) + collector.RegisterBuiltinCollectors(cs.collectService, cs.logger) + // Initialize collect job service cs.collectJobService = NewCollectJobService( cs.timerDispatcher, @@ -163,14 +171,18 @@ func (cs *CollectServer) wireComponents() { // Set network client in collect job service cs.collectJobService.SetNetworkClient(cs.networkClient) + // Wire collect service with timer dispatcher for enhanced scheduling + cs.timerDispatcher.SetCollectService(cs.collectService) + cs.timerDispatcher.SetCollectDataDispatcher(cs.collectJobService) + // Register message processors cs.registerMessageProcessors() - // Set collect job service as the metrics task dispatcher - cs.timerDispatcher.SetMetricsTaskDispatcher(cs.collectJobService) + // TimerDispatcher implements MetricsTaskDispatcher itself, so we use it as its own dispatcher + cs.timerDispatcher.SetMetricsDispatcher(cs.timerDispatcher) - // Set collect job service as the collect data dispatcher - // cs.timerDispatcher.SetCollectDataDispatcher(cs.collectJobService) // TODO: Fix interface compatibility + // TODO: Set collect service as the primary metrics task dispatcher (enhanced version) + // cs.timerDispatcher.SetMetricsDispatcher(cs.collectService) } // registerMessageProcessors registers all message processors with the network client. @@ -287,6 +299,37 @@ func (cs *CollectServer) IsStarted() bool { return cs.isStarted } +// ReceiveJob simulates receiving a job from the manager and adds it to the dispatcher. +// This method is used for testing and simulation purposes when network communication is not available. +func (cs *CollectServer) ReceiveJob(job *jobtypes.Job, eventListener timer.CollectResponseEventListener) error { + if !cs.isStarted { + return fmt.Errorf("collect server is not started") + } + + cs.logger.Info("received job from manager (simulated)", + "jobId", job.ID, + "monitorId", job.MonitorID, + "app", job.App, + "category", job.Category, + "isCyclic", job.IsCyclic, + "interval", job.DefaultInterval) + + // Add the job to the timer dispatcher for scheduling + if err := cs.timerDispatcher.AddJob(job, eventListener); err != nil { + cs.logger.Error(err, "failed to add received job to timer dispatcher", + "jobId", job.ID, + "monitorId", job.MonitorID) + return fmt.Errorf("failed to schedule received job: %w", err) + } + + cs.logger.Info("job successfully scheduled for execution", + "jobId", job.ID, + "monitorId", job.MonitorID, + "nextExecution", "immediate") + + return nil +} + // GetCollectJobService returns the collect job service. func (cs *CollectServer) GetCollectJobService() *CollectJobService { return cs.collectJobService diff --git a/pkg/collector/common/timer/timer_dispatcher.go b/pkg/collector/common/timer/timer_dispatcher.go index 1f78db5..215a406 100644 --- a/pkg/collector/common/timer/timer_dispatcher.go +++ b/pkg/collector/common/timer/timer_dispatcher.go @@ -42,6 +42,12 @@ type TimerDispatcher struct { tempTasks sync.Map // map[int64]*jobtypes.Timeout eventListeners sync.Map // map[int64]CollectResponseEventListener + // Timeout monitoring + timeoutMonitor sync.Map // map[string]*MetricsTimeoutInfo + timeoutCheckInterval time.Duration + timeoutCheckTicker *time.Ticker + stopTimeoutMonitor chan struct{} + // State management started atomic.Bool @@ -53,15 +59,27 @@ type TimerDispatcher struct { logger logger.Logger } +// MetricsTimeoutInfo tracks timeout information for metrics collection tasks +type MetricsTimeoutInfo struct { + StartTime time.Time + Metrics *jobtypes.Metrics + Timeout *jobtypes.Timeout + MaxDuration time.Duration + RetryCount int + MaxRetries int +} + // NewTimerDispatcher creates a new timer dispatcher func NewTimerDispatcher(logger logger.Logger) *TimerDispatcher { // Create worker pool with default configuration workerConfig := worker.DefaultWorkerPoolConfig() td := &TimerDispatcher{ - wheelTimer: NewTimerWheel(logger.WithName("timer-wheel")), - workerPool: worker.NewPriorityWorkerPool(workerConfig, logger), - logger: logger.WithName("timer-dispatcher"), + wheelTimer: NewTimerWheel(logger.WithName("timer-wheel")), + workerPool: worker.NewPriorityWorkerPool(workerConfig, logger), + timeoutCheckInterval: 10 * time.Second, // Check for timeouts every 10 seconds + stopTimeoutMonitor: make(chan struct{}), + logger: logger.WithName("timer-dispatcher"), } td.started.Store(true) @@ -99,6 +117,9 @@ func (td *TimerDispatcher) Start() error { return fmt.Errorf("failed to start timer wheel: %w", err) } + // Start timeout monitoring + td.startTimeoutMonitoring() + td.logger.Info("timer dispatcher started successfully") return nil } @@ -110,6 +131,9 @@ func (td *TimerDispatcher) Stop() error { // Mark as offline td.GoOffline() + // Stop timeout monitoring + td.stopTimeoutMonitoring() + // Stop the timer wheel first if err := td.wheelTimer.Stop(); err != nil { td.logger.Info("failed to stop timer wheel", "error", err) @@ -431,3 +455,176 @@ func (td *TimerDispatcher) SetMetricsTaskDispatcher(dispatcher interface{}) { // This method is for interface compatibility with communication layer // The TimerDispatcher itself implements MetricsTaskDispatcher } + +// startTimeoutMonitoring starts the timeout monitoring goroutine +func (td *TimerDispatcher) startTimeoutMonitoring() { + td.timeoutCheckTicker = time.NewTicker(td.timeoutCheckInterval) + + go func() { + defer td.timeoutCheckTicker.Stop() + + for { + select { + case <-td.timeoutCheckTicker.C: + td.checkTimeouts() + case <-td.stopTimeoutMonitor: + td.logger.Info("timeout monitoring stopped") + return + } + } + }() + + td.logger.Info("timeout monitoring started", "interval", td.timeoutCheckInterval) +} + +// stopTimeoutMonitoring stops the timeout monitoring +func (td *TimerDispatcher) stopTimeoutMonitoring() { + if td.timeoutCheckTicker != nil { + close(td.stopTimeoutMonitor) + td.timeoutCheckTicker.Stop() + } +} + +// checkTimeouts checks for timed out metrics collection tasks +func (td *TimerDispatcher) checkTimeouts() { + now := time.Now() + var timeoutKeys []string + + td.timeoutMonitor.Range(func(key, value interface{}) bool { + timeoutKey := key.(string) + timeoutInfo := value.(*MetricsTimeoutInfo) + + // Check if task has timed out + if now.Sub(timeoutInfo.StartTime) > timeoutInfo.MaxDuration { + timeoutKeys = append(timeoutKeys, timeoutKey) + + td.logger.Info("metrics collection task timed out", + "key", timeoutKey, + "metrics", timeoutInfo.Metrics.Name, + "duration", now.Sub(timeoutInfo.StartTime), + "maxDuration", timeoutInfo.MaxDuration, + "retryCount", timeoutInfo.RetryCount) + } + + return true + }) + + // Handle timed out tasks + for _, timeoutKey := range timeoutKeys { + td.handleTimeout(timeoutKey) + } +} + +// handleTimeout handles a timed out metrics collection task +func (td *TimerDispatcher) handleTimeout(timeoutKey string) { + value, exists := td.timeoutMonitor.Load(timeoutKey) + if !exists { + return + } + + timeoutInfo := value.(*MetricsTimeoutInfo) + + // Remove from timeout monitor + td.timeoutMonitor.Delete(timeoutKey) + + // Check if we should retry + if timeoutInfo.RetryCount < timeoutInfo.MaxRetries { + td.logger.Info("retrying timed out metrics collection task", + "key", timeoutKey, + "metrics", timeoutInfo.Metrics.Name, + "retryCount", timeoutInfo.RetryCount+1, + "maxRetries", timeoutInfo.MaxRetries) + + // Retry the task + td.retryMetricsCollection(timeoutInfo) + } else { + td.logger.Info("metrics collection task exceeded max retries", + "key", timeoutKey, + "metrics", timeoutInfo.Metrics.Name, + "retryCount", timeoutInfo.RetryCount, + "maxRetries", timeoutInfo.MaxRetries) + + // TODO: Notify failure to appropriate handlers + // This could involve sending a failure response or triggering alerts + } +} + +// retryMetricsCollection retries a failed metrics collection task +func (td *TimerDispatcher) retryMetricsCollection(timeoutInfo *MetricsTimeoutInfo) { + // Create new timeout info with incremented retry count + newTimeoutInfo := &MetricsTimeoutInfo{ + StartTime: time.Now(), + Metrics: timeoutInfo.Metrics, + Timeout: timeoutInfo.Timeout, + MaxDuration: timeoutInfo.MaxDuration, + RetryCount: timeoutInfo.RetryCount + 1, + MaxRetries: timeoutInfo.MaxRetries, + } + + // Generate retry key + retryKey := fmt.Sprintf("%s-retry-%d", + td.generateTimeoutKey(timeoutInfo.Metrics, timeoutInfo.Timeout), + newTimeoutInfo.RetryCount) + + // Add to timeout monitor + td.timeoutMonitor.Store(retryKey, newTimeoutInfo) + + // Dispatch the retry task with exponential backoff + retryDelay := time.Duration(timeoutInfo.RetryCount) * 5 * time.Second + time.AfterFunc(retryDelay, func() { + if td.metricsDispatcher != nil { + if err := td.metricsDispatcher.DispatchMetricsTask(timeoutInfo.Timeout); err != nil { + td.logger.Error(err, "failed to retry metrics collection task", + "key", retryKey, + "metrics", timeoutInfo.Metrics.Name) + // Remove from monitor if retry dispatch fails + td.timeoutMonitor.Delete(retryKey) + } + } + }) +} + +// AddMetricsTimeout adds a metrics collection task to timeout monitoring +func (td *TimerDispatcher) AddMetricsTimeout(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout) { + // Calculate max duration based on metrics timeout or default + maxDuration := 120 * time.Second // default 2 minutes + if metrics.Timeout != "" { + if duration, err := time.ParseDuration(metrics.Timeout); err == nil { + maxDuration = duration + } + } + + timeoutInfo := &MetricsTimeoutInfo{ + StartTime: time.Now(), + Metrics: metrics, + Timeout: timeout, + MaxDuration: maxDuration, + RetryCount: 0, + MaxRetries: 3, // default max retries + } + + key := td.generateTimeoutKey(metrics, timeout) + td.timeoutMonitor.Store(key, timeoutInfo) + + td.logger.Info("added metrics timeout monitoring", + "key", key, + "metrics", metrics.Name, + "maxDuration", maxDuration, + "maxRetries", timeoutInfo.MaxRetries) +} + +// RemoveMetricsTimeout removes a metrics collection task from timeout monitoring +func (td *TimerDispatcher) RemoveMetricsTimeout(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout) { + key := td.generateTimeoutKey(metrics, timeout) + td.timeoutMonitor.Delete(key) + + td.logger.Info("removed metrics timeout monitoring", + "key", key, + "metrics", metrics.Name) +} + +// generateTimeoutKey generates a unique key for timeout monitoring +func (td *TimerDispatcher) generateTimeoutKey(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout) string { + // Use a combination of metrics name and timestamp to ensure uniqueness + return fmt.Sprintf("%s-%d-%s", metrics.Name, time.Now().UnixNano(), metrics.Protocol) +} diff --git a/pkg/collector/config/config.go b/pkg/collector/config/config.go deleted file mode 100644 index 7b1270d..0000000 --- a/pkg/collector/config/config.go +++ /dev/null @@ -1,118 +0,0 @@ -package config - -import ( - "context" - "errors" - "os" - - "gopkg.in/yaml.v3" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" -) - -const ( - DefaultHertzBeatCollectorName = "hertzbeat-collector" -) - -type HookFunc func(c context.Context, server *server.CollectorServer) error - -type Loader struct { - cfgPath string - logger logger.Logger - cancel context.CancelFunc - server *server.CollectorServer - - hook HookFunc - - // todo file watcher - // watcher *fsnotify.Watcher -} - -func New(cfgPath string, server *server.CollectorServer, f HookFunc) *Loader { - - return &Loader{ - cfgPath: cfgPath, - server: server, - logger: server.Logger.WithName("collector-config-loader"), - hook: f, - } -} - -func (ld *Loader) LoadConfig() (*types.CollectorConfig, error) { - - ld.runHook() - - if ld.cfgPath == "" { - ld.logger.Info("collector-config-loader: path is empty") - return nil, errors.New("collector-config-loader: path is empty") - } - - if _, err := os.Stat(ld.cfgPath); os.IsNotExist(err) { - ld.logger.Error(err, "collector-config-loader: file not exist", "path", ld.cfgPath) - return nil, err - } - - file, err := os.Open(ld.cfgPath) - if err != nil { - return nil, err - } - defer func(file *os.File) { - err := file.Close() - if err != nil { - ld.logger.Error(err, "close config file failed") - } - }(file) - - var cfg types.CollectorConfig - decoder := yaml.NewDecoder(file) - if err := decoder.Decode(&cfg); err != nil { - ld.logger.Error(err, "decode config file failed") - return nil, err - } - - return &cfg, nil -} - -func (ld *Loader) ValidateConfig(cfg *types.CollectorConfig) error { - - if cfg == nil { - ld.logger.Sugar().Debug("collector-config-loader is nil") - return errors.New("collector-config-loader is nil") - } - - // other check - if cfg.Collector.Info.IP == "" { - ld.logger.Sugar().Debug("collector-config-loader ip is empty") - return errors.New("collector-config-loader ip is empty") - } - - if cfg.Collector.Info.Port == "" { - ld.logger.Sugar().Debug("collector-config-loader port is empty") - return errors.New("collector-config-loader port is empty") - } - - if cfg.Collector.Info.Name == "" { - ld.logger.Sugar().Debug("collector-config-loader: name is empty") - cfg.Collector.Info.Name = DefaultHertzBeatCollectorName - } - - return nil -} - -func (r *Loader) runHook() { - - if r.hook == nil { - return - } - - r.logger.Info("running hook") - c, cancel := context.WithCancel(context.TODO()) - r.cancel = cancel - go func(ctx context.Context) { - if err := r.hook(ctx, r.server); err != nil { - r.logger.Error(err, "hook error") - } - }(c) -} diff --git a/pkg/collector/config/config_test.go b/pkg/collector/config/config_test.go deleted file mode 100644 index b16fc79..0000000 --- a/pkg/collector/config/config_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package config - -import ( - "os" - "testing" -) - -func TestLoadConfig(t *testing.T) { - - content := `{ - "collector": { - "version": "1.0.0", - "ip": "127.0.0.1" - }, - "dispatcher": {} - }` - dumpfile, err := os.CreateTemp("", "collector_config_*.json") - if err != nil { - t.Fatalf("failed to create temp file: %v", err) - } - defer func(name string) { - err := os.Remove(name) - if err != nil { - t.Logf("failed to remove temp file: %v", err) - } - }(dumpfile.Name()) - if _, err := dumpfile.Write([]byte(content)); err != nil { - t.Fatalf("failed to write to temp file: %v", err) - } - err = dumpfile.Close() - if err != nil { - return - } - - cfg, err := LoadConfig(dumpfile.Name()) - if err != nil { - t.Fatalf("LoadConfig failed: %v", err) - } - if cfg.Collector.Version != "1.0.0" { - t.Errorf("expected version '1.0.0', got '%s'", cfg.Collector.Version) - } - if cfg.Collector.IP != "127.0.0.1" { - t.Errorf("expected ip '127.0.0.1', got '%s'", cfg.Collector.IP) - } -} diff --git a/pkg/collector/registry.go b/pkg/collector/registry.go index 64af30f..7f8331c 100644 --- a/pkg/collector/registry.go +++ b/pkg/collector/registry.go @@ -18,23 +18,40 @@ package collector import ( - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/database" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/registry" "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + + // 导入所有采集器以触发自动注册 + _ "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/database" ) -// RegisterBuiltinCollectors registers all built-in collectors with the service -// This function should be called during application initialization to avoid circular dependencies +// RegisterBuiltinCollectors 自动注册所有内置采集器 +// 使用新的自动注册机制,无需手动添加每个采集器 func RegisterBuiltinCollectors(service *CollectService, logger logger.Logger) { - // Register JDBC collector - jdbcCollector := database.NewJDBCCollector(logger) - service.RegisterCollector(jdbcCollector.SupportProtocol(), jdbcCollector) + // 从注册中心创建所有启用的采集器 + collectors, err := registry.GetGlobalCenter().CreateCollectors(logger) + if err != nil { + logger.Error(err, "创建采集器失败") + return + } + + // 注册到CollectService + for protocol, collector := range collectors { + service.RegisterCollector(protocol, collector) + logger.Info("采集器注册到服务", "protocol", protocol) + } +} - // TODO: Register other built-in collectors here - // httpCollector := http.NewHTTPCollector(logger) - // service.RegisterCollector(httpCollector.SupportProtocol(), httpCollector) +// RegisterBuiltinCollectorsLegacy 传统的手动注册方式 (已废弃) +// 保留用于兼容性,建议使用RegisterBuiltinCollectors +// +// Deprecated: 使用RegisterBuiltinCollectors替代,它会自动注册所有采集器 +func RegisterBuiltinCollectorsLegacy(service *CollectService, logger logger.Logger) { + logger.Info("⚠️ 使用已废弃的手动注册方式,建议切换到自动注册") - // sshCollector := ssh.NewSSHCollector(logger) - // service.RegisterCollector(sshCollector.SupportProtocol(), sshCollector) + // 这里可以保留原来的手动注册逻辑用于兼容 + // jdbcCollector := database.NewJDBCCollector(logger) + // service.RegisterCollector(jdbcCollector.SupportProtocol(), jdbcCollector) } // NewCollectServiceWithBuiltins creates a new collect service and registers all built-in collectors diff --git a/pkg/collector/registry/registry_center.go b/pkg/collector/registry/registry_center.go new file mode 100644 index 0000000..0cc8a4e --- /dev/null +++ b/pkg/collector/registry/registry_center.go @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + "fmt" + "sync" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" +) + +// CollectorFactory 采集器工厂函数类型 +type CollectorFactory func(logger logger.Logger) basic.AbstractCollector + +// CollectorRegistration 采集器注册信息 +type CollectorRegistration struct { + Protocol string + Factory CollectorFactory + Enabled bool + Priority int // 优先级,数字越小优先级越高 +} + +// RegistryCenter 注册中心 +type RegistryCenter struct { + registrations map[string]*CollectorRegistration + mutex sync.RWMutex +} + +// 全局注册中心实例 +var globalCenter = &RegistryCenter{ + registrations: make(map[string]*CollectorRegistration), +} + +// GetGlobalCenter 获取全局注册中心 +func GetGlobalCenter() *RegistryCenter { + return globalCenter +} + +// RegisterFactory 注册采集器工厂函数 +func (rc *RegistryCenter) RegisterFactory(protocol string, factory CollectorFactory, options ...RegistrationOption) { + rc.mutex.Lock() + defer rc.mutex.Unlock() + + registration := &CollectorRegistration{ + Protocol: protocol, + Factory: factory, + Enabled: true, // 默认启用 + Priority: 100, // 默认优先级 + } + + // 应用选项 + for _, option := range options { + option(registration) + } + + rc.registrations[protocol] = registration +} + +// RegistrationOption 注册选项函数类型 +type RegistrationOption func(*CollectorRegistration) + +// WithDisabled 禁用采集器选项 +func WithDisabled() RegistrationOption { + return func(reg *CollectorRegistration) { + reg.Enabled = false + } +} + +// WithPriority 设置优先级选项 +func WithPriority(priority int) RegistrationOption { + return func(reg *CollectorRegistration) { + reg.Priority = priority + } +} + +// GetRegistrations 获取所有注册信息 +func (rc *RegistryCenter) GetRegistrations() map[string]*CollectorRegistration { + rc.mutex.RLock() + defer rc.mutex.RUnlock() + + // 返回副本,避免外部修改 + result := make(map[string]*CollectorRegistration) + for k, v := range rc.registrations { + result[k] = &CollectorRegistration{ + Protocol: v.Protocol, + Factory: v.Factory, + Enabled: v.Enabled, + Priority: v.Priority, + } + } + return result +} + +// GetRegistration 获取特定协议的注册信息 +func (rc *RegistryCenter) GetRegistration(protocol string) (*CollectorRegistration, bool) { + rc.mutex.RLock() + defer rc.mutex.RUnlock() + + reg, exists := rc.registrations[protocol] + if !exists { + return nil, false + } + + return &CollectorRegistration{ + Protocol: reg.Protocol, + Factory: reg.Factory, + Enabled: reg.Enabled, + Priority: reg.Priority, + }, true +} + +// SetEnabled 设置协议启用状态 +func (rc *RegistryCenter) SetEnabled(protocol string, enabled bool) bool { + rc.mutex.Lock() + defer rc.mutex.Unlock() + + if reg, exists := rc.registrations[protocol]; exists { + reg.Enabled = enabled + return true + } + return false +} + +// GetEnabledProtocols 获取启用的协议列表 +func (rc *RegistryCenter) GetEnabledProtocols() []string { + rc.mutex.RLock() + defer rc.mutex.RUnlock() + + var enabled []string + for protocol, reg := range rc.registrations { + if reg.Enabled { + enabled = append(enabled, protocol) + } + } + return enabled +} + +// GetAllProtocols 获取所有协议列表 +func (rc *RegistryCenter) GetAllProtocols() []string { + rc.mutex.RLock() + defer rc.mutex.RUnlock() + + protocols := make([]string, 0, len(rc.registrations)) + for protocol := range rc.registrations { + protocols = append(protocols, protocol) + } + return protocols +} + +// CreateCollectors 创建所有启用的采集器实例 +func (rc *RegistryCenter) CreateCollectors(logger logger.Logger) (map[string]basic.AbstractCollector, error) { + registrations := rc.GetRegistrations() + + // 按优先级排序 + sortedRegs := make([]*CollectorRegistration, 0) + for _, reg := range registrations { + if reg.Enabled { + sortedRegs = append(sortedRegs, reg) + } + } + + // 简单排序 + for i := 0; i < len(sortedRegs)-1; i++ { + for j := i + 1; j < len(sortedRegs); j++ { + if sortedRegs[i].Priority > sortedRegs[j].Priority { + sortedRegs[i], sortedRegs[j] = sortedRegs[j], sortedRegs[i] + } + } + } + + collectors := make(map[string]basic.AbstractCollector) + + for _, reg := range sortedRegs { + collector := reg.Factory(logger.WithName("collector-" + reg.Protocol)) + if collector == nil { + logger.Error(fmt.Errorf("工厂函数返回nil"), "采集器创建失败", "protocol", reg.Protocol) + continue + } + + // 验证协议匹配 + if collector.SupportProtocol() != reg.Protocol { + logger.Error(fmt.Errorf("协议不匹配"), "注册协议与实际协议不符", + "expected", reg.Protocol, + "actual", collector.SupportProtocol()) + continue + } + + collectors[reg.Protocol] = collector + logger.Info("采集器创建成功", + "protocol", reg.Protocol, + "priority", reg.Priority) + } + + logger.Info("采集器创建完成", + "成功", len(collectors), + "总数", len(sortedRegs)) + + return collectors, nil +} + +// 全局便捷函数 +// RegisterCollectorFactory 注册采集器工厂到全局注册中心 +func RegisterCollectorFactory(protocol string, factory CollectorFactory, options ...RegistrationOption) { + globalCenter.RegisterFactory(protocol, factory, options...) +} + +// GetRegisteredProtocols 获取全局注册中心的协议列表 +func GetRegisteredProtocols() []string { + return globalCenter.GetAllProtocols() +} + +// GetEnabledProtocols 获取全局注册中心启用的协议列表 +func GetEnabledProtocols() []string { + return globalCenter.GetEnabledProtocols() +} + +// EnableProtocol 启用协议 +func EnableProtocol(protocol string, logger logger.Logger) bool { + if globalCenter.SetEnabled(protocol, true) { + logger.Info("启用协议", "protocol", protocol) + return true + } + logger.Info("协议未注册,无法启用", "protocol", protocol) + return false +} + +// DisableProtocol 禁用协议 +func DisableProtocol(protocol string, logger logger.Logger) bool { + if globalCenter.SetEnabled(protocol, false) { + logger.Info("禁用协议", "protocol", protocol) + return true + } + logger.Info("协议未注册,无法禁用", "protocol", protocol) + return false +} diff --git a/pkg/collector/server/server.go b/pkg/collector/server/server.go deleted file mode 100644 index d1f0de7..0000000 --- a/pkg/collector/server/server.go +++ /dev/null @@ -1,69 +0,0 @@ -package server - -import ( - "context" - "os" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/job" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/transport" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" -) - -const ( - DefaultHertzBeatCollectorVersion = "0.0.1-DEV" -) - -type Run interface { - Start(ctx context.Context) error - Close() error -} - -// CollectorServer HertzBeat Collector Server -type CollectorServer struct { - Version string - Logger logger.Logger - - job *job.Server - transport *transport.Server -} - -func NewCollectorServer(version string) *CollectorServer { - - if version == "" { - version = DefaultHertzBeatCollectorVersion - } - - return &CollectorServer{ - Version: version, - Logger: logger.DefaultLogger(os.Stdout, types.LogLevelDebug), - } -} - -func (s *CollectorServer) Start(ctx context.Context) error { - - s.Logger.Info("hi, starting collector server...") - - // start job server - s.job = job.NewServer(s.Logger.WithName("job")) - - // init and start transport server - s.transport = transport.NewServer(s.Logger.WithName("transport")) - - // Wait until done - <-ctx.Done() - - return nil -} - -func (s *CollectorServer) Validate() error { - - return nil -} - -// Close Shutdown the server hook -func (s *CollectorServer) Close() error { - - s.Logger.Info("collector server shutting down... bye!") - return nil -} diff --git a/pkg/collector/server/server_test.go b/pkg/collector/server/server_test.go deleted file mode 100644 index abb4e43..0000000 --- a/pkg/collector/server/server_test.go +++ /dev/null @@ -1 +0,0 @@ -package server diff --git a/pkg/types/job/job_types.go b/pkg/types/job/job_types.go index ef58676..50ee83d 100644 --- a/pkg/types/job/job_types.go +++ b/pkg/types/job/job_types.go @@ -32,3 +32,20 @@ type Job struct { PriorMetrics []Metrics `json:"-"` ResponseDataTemp []MetricsData `json:"-"` } + +// GetNextCollectMetrics returns the metrics that should be collected next +// This is a simplified version - in the full implementation this would handle +// metric priorities, dependencies, and collection levels +func (j *Job) GetNextCollectMetrics() []*Metrics { + result := make([]*Metrics, 0, len(j.Metrics)) + for i := range j.Metrics { + result = append(result, &j.Metrics[i]) + } + return result +} + +// ConstructPriorMetrics prepares prior metrics for collection dependencies +func (j *Job) ConstructPriorMetrics() { + j.PriorMetrics = make([]Metrics, len(j.Metrics)) + copy(j.PriorMetrics, j.Metrics) +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
