This is an automated email from the ASF dual-hosted git repository. gaoxingcun pushed a commit to branch reconfiguration_scheduling in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 75f4f7997ac1909c53d7b297a5a4491e4cb1eb24 Author: yuluo-yx <[email protected]> AuthorDate: Sun Aug 31 13:08:07 2025 +0800 init commit Signed-off-by: yuluo-yx <[email protected]> --- .github/.keep | 0 .gitignore | 49 +++++++++ .go-version | 1 + Dockerfile | 0 Makefile | 23 ++++ VERSION | 1 + cmd/collector/hertzbeat-collector.yaml | 9 ++ cmd/collector/main.go | 24 +++++ go.mod | 27 +++++ go.sum | 47 ++++++++ pkg/banner/banner.txt | 5 + pkg/banner/embed.go | 54 ++++++++++ pkg/collector/basic/.keep | 0 pkg/collector/basic/database/.keep | 0 pkg/collector/basic/dns/.keep | 0 pkg/collector/basic/ftp/.keep | 0 pkg/collector/basic/http/.keep | 0 pkg/collector/basic/icmp/.keep | 0 pkg/collector/basic/imap/.keep | 0 pkg/collector/basic/ipmi2/.keep | 0 pkg/collector/basic/jmx/.keep | 0 pkg/collector/basic/memcached/.keep | 0 pkg/collector/basic/modbus/.keep | 0 pkg/collector/basic/mqtt/.keep | 0 pkg/collector/basic/nginx/.keep | 0 pkg/collector/basic/ntp/.keep | 0 pkg/collector/basic/plc/.keep | 0 pkg/collector/basic/pop3/.keep | 0 pkg/collector/basic/prometheus/.keep | 0 pkg/collector/basic/push/.keep | 0 pkg/collector/basic/redfish/.keep | 0 pkg/collector/basic/redis/.keep | 0 pkg/collector/basic/registry/.keep | 0 pkg/collector/basic/s7/.keep | 0 pkg/collector/basic/script/.keep | 0 pkg/collector/basic/sd/.keep | 0 pkg/collector/basic/smtp/.keep | 0 pkg/collector/basic/snmp/.keep | 0 pkg/collector/basic/ssh/.keep | 0 pkg/collector/basic/telnet/.keep | 0 pkg/collector/basic/udp/.keep | 0 pkg/collector/basic/websocket/.keep | 0 pkg/collector/bootstrap.go | 71 ++++++++++++ pkg/collector/common/cache/.keep | 0 pkg/collector/common/collect/dispatch/.keep | 0 .../collect/metrics/hertzbeat_metrics_collector.go | 68 ++++++++++++ .../common/collect/strategy/strategy_factory.go | 39 +++++++ pkg/collector/common/dispatcher/exporter/.keep | 0 pkg/collector/common/ssh/.keep | 0 pkg/collector/config/config.go | 119 ++++++++++++++++++++ pkg/collector/config/config_test.go | 45 ++++++++ pkg/collector/internel/server.go | 48 +++++++++ pkg/collector/kafka/.keep | 0 pkg/collector/mongodb/.keep | 0 pkg/collector/nebulagraph/.keep | 0 pkg/collector/rocketmq/.keep | 0 pkg/constants/const.go | 27 +++++ pkg/logger/logger.go | 120 +++++++++++++++++++++ pkg/logger/logger_test.go | 101 +++++++++++++++++ pkg/types/config_types.go | 22 ++++ pkg/types/job_types.go | 35 ++++++ pkg/types/logging_types.go | 65 +++++++++++ pkg/util/.keep | 0 63 files changed, 1000 insertions(+) diff --git a/.github/.keep b/.github/.keep new file mode 100644 index 0000000..e69de29 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ed854b5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,49 @@ +# git history files +.history_rewritten_* +# Eclipse artifacts +.project +.pydevproject +#Vagrant +tools/vagrant/.vagrant/ +# Intellij +*.iml +.idea/ +.run/ +# Visual Studio Code +.vscode/ +# Bazel +/bazel-* +# vi swap files +.*.swp +# vi backups +*.bak +# constants backups +*~ +# python artifacts +*.pyc + +# lint +lintconfig.gen.json + +*.orig + +LICENSES.txt + +**/var/run/secrets/ +# Certs generated by testing +security/cmd/node_agent/na/cert_file +security/cmd/node_agent/na/pkey + +vendor +# Contains the built artifacts +out/ +etc/ +var/ +# Go compiled tests +*.test +# Profiles +*.prof +# MacOS extended attributes +._* +# MacOS Desktop Services Store +.DS_Store diff --git a/.go-version b/.go-version new file mode 100644 index 0000000..7a429d6 --- /dev/null +++ b/.go-version @@ -0,0 +1 @@ +1.24.6 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e69de29 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7f69aa8 --- /dev/null +++ b/Makefile @@ -0,0 +1,23 @@ +.PHONY: +fmt: + go fmt ./... + +.PHONY: +lint: + golangci-lint run + +.PHONY: +vet: + go vet ./... + +.PHONY: +dev: + go run ./cmd/main.go + +.PHONY: +build: + go build -o bin/app ./cmd/main.go + +.PHONY: +test: + go test -v ./... diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..e2a9a09 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.0.1-DEV diff --git a/cmd/collector/hertzbeat-collector.yaml b/cmd/collector/hertzbeat-collector.yaml new file mode 100644 index 0000000..9ef86d1 --- /dev/null +++ b/cmd/collector/hertzbeat-collector.yaml @@ -0,0 +1,9 @@ +collector: + info: + Name: hertzbeat-collector-go + version: 0.0.1-DEV + ip: 127.0.0.1 + port: 8080 + + log: + level: debug diff --git a/cmd/collector/main.go b/cmd/collector/main.go new file mode 100644 index 0000000..ebf420f --- /dev/null +++ b/cmd/collector/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "flag" + "os" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" +) + +var ( + conf string +) + +func init() { + flag.StringVar(&conf, "conf", "hertzbeat-collector.yaml", "path to config file") +} + +func main() { + + if err := collector.Bootstrap(conf); err != nil { + + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6ec1e8b --- /dev/null +++ b/go.mod @@ -0,0 +1,27 @@ +module hertzbeat.apache.org/hertzbeat-collector-go + +go 1.24.6 + +require ( + github.com/go-logr/logr v1.4.3 + github.com/go-logr/zapr v1.3.0 + github.com/prometheus/client_golang v1.23.0 + github.com/stretchr/testify v1.10.0 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.65.0 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/sys v0.33.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..88b898b --- /dev/null +++ b/go.sum @@ -0,0 +1,47 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= +github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc= +github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE= +github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/banner/banner.txt b/pkg/banner/banner.txt new file mode 100644 index 0000000..51c6457 --- /dev/null +++ b/pkg/banner/banner.txt @@ -0,0 +1,5 @@ + _ _ _ ____ _ + | | | | ___ _ __| |_ ___| __ ) ___ __ _| |_ + | |_| |/ _ \ '__| __|_ / _ \ / _ \/ _` | __| + | _ | __/ | | |_ / /| |_) | __/ (_| | |_ Name: {{ .CollectorName }} Port: {{ .ServerPort }} Pid: {{ .Pid }} + |_| |_|\___|_| \__/___|____/ \___|\__,_|\__| https://hertzbeat.apache.org/ diff --git a/pkg/banner/embed.go b/pkg/banner/embed.go new file mode 100644 index 0000000..d3ad750 --- /dev/null +++ b/pkg/banner/embed.go @@ -0,0 +1,54 @@ +package banner + +import ( + "embed" + "os" + "strconv" + "text/template" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/internel" +) + +//go:embed banner.txt +var EmbedLogo embed.FS + +type Banner struct { + server *internel.CollectorServer +} + +func New(server *internel.CollectorServer) *Banner { + return &Banner{server: server} +} + +type bannerVars struct { + CollectorName string + ServerPort string + Pid string +} + +func (b *Banner) PrintBanner(appName, port string) error { + data, err := EmbedLogo.ReadFile("banner.txt") + if err != nil { + b.server.Logger.Error(err, "read banner file failed") + return err + } + + tmpl, err := template.New("banner").Parse(string(data)) + if err != nil { + b.server.Logger.Error(err, "parse banner template failed") + return err + } + + vars := bannerVars{ + CollectorName: appName, + ServerPort: port, + Pid: strconv.Itoa(os.Getpid()), + } + + err = tmpl.Execute(os.Stdout, vars) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/collector/basic/.keep b/pkg/collector/basic/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/database/.keep b/pkg/collector/basic/database/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/dns/.keep b/pkg/collector/basic/dns/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/ftp/.keep b/pkg/collector/basic/ftp/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/http/.keep b/pkg/collector/basic/http/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/icmp/.keep b/pkg/collector/basic/icmp/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/imap/.keep b/pkg/collector/basic/imap/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/ipmi2/.keep b/pkg/collector/basic/ipmi2/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/jmx/.keep b/pkg/collector/basic/jmx/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/memcached/.keep b/pkg/collector/basic/memcached/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/modbus/.keep b/pkg/collector/basic/modbus/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/mqtt/.keep b/pkg/collector/basic/mqtt/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/nginx/.keep b/pkg/collector/basic/nginx/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/ntp/.keep b/pkg/collector/basic/ntp/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/plc/.keep b/pkg/collector/basic/plc/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/pop3/.keep b/pkg/collector/basic/pop3/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/prometheus/.keep b/pkg/collector/basic/prometheus/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/push/.keep b/pkg/collector/basic/push/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/redfish/.keep b/pkg/collector/basic/redfish/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/redis/.keep b/pkg/collector/basic/redis/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/registry/.keep b/pkg/collector/basic/registry/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/s7/.keep b/pkg/collector/basic/s7/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/script/.keep b/pkg/collector/basic/script/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/sd/.keep b/pkg/collector/basic/sd/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/smtp/.keep b/pkg/collector/basic/smtp/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/snmp/.keep b/pkg/collector/basic/snmp/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/ssh/.keep b/pkg/collector/basic/ssh/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/telnet/.keep b/pkg/collector/basic/telnet/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/udp/.keep b/pkg/collector/basic/udp/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/basic/websocket/.keep b/pkg/collector/basic/websocket/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/bootstrap.go b/pkg/collector/bootstrap.go new file mode 100644 index 0000000..9b46a90 --- /dev/null +++ b/pkg/collector/bootstrap.go @@ -0,0 +1,71 @@ +package collector + +import ( + "context" + "os" + "os/signal" + "syscall" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/banner" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/config" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/internel" +) + +func Bootstrap(confPath string) error { + + // Init collector server + server := internel.NewCollectorServer() + + server.Logger.Sugar().Debug("测试日志级别") + + // Load HertzBeat collector config + loader := config.New(confPath, server, nil) + cfg, err := loader.LoadConfig() + if err != nil { + server.Logger.Error(err, "load collector config failed") + return err + } + err = loader.ValidateConfig(cfg) + if err != nil { + server.Logger.Error(err, "validate collector config failed") + return err + } + + // render banner + err = banner.New(server).PrintBanner(cfg.Collector.Info.Name, cfg.Collector.Info.Port) + if err != nil { + server.Logger.Error(err, "print banner failed") + return err + } + + // Load collector job + + // check collector server + err = server.Validate() + if err != nil { + server.Logger.Error(err, "validate collector server failed") + return err + } + + // Start collector server + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + cancel() + }() + + err = server.Start(ctx) + if err != nil { + server.Logger.Error(err, "start collector server failed") + return err + } + + // shutdown collector server + _ = server.Close() + + return nil +} diff --git a/pkg/collector/common/cache/.keep b/pkg/collector/common/cache/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/common/collect/dispatch/.keep b/pkg/collector/common/collect/dispatch/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/common/collect/metrics/hertzbeat_metrics_collector.go b/pkg/collector/common/collect/metrics/hertzbeat_metrics_collector.go new file mode 100644 index 0000000..793c1f9 --- /dev/null +++ b/pkg/collector/common/collect/metrics/hertzbeat_metrics_collector.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "fmt" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type Job struct { + App string + MonitorId int64 + Metadata map[string]string +} + +type HertzBeatMetricsCollector struct { + collectTotal *prometheus.CounterVec + collectDuration *prometheus.HistogramVec + once sync.Once +} + +func NewHertzBeatMetricsCollector() *HertzBeatMetricsCollector { + collector := &HertzBeatMetricsCollector{} + collector.once.Do(func() { + collector.collectTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "hertzbeat_collect_total", + Help: "The total number of collection tasks executed", + }, + []string{"status", "monitor_type", "monitor_id", "monitor_name", "monitor_target"}, + ) + collector.collectDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "hertzbeat_collect_duration", + Help: "The duration of collection task executions", + Buckets: prometheus.ExponentialBuckets(10, 2, 10), // Example buckets + }, + []string{"status", "monitor_type", "monitor_id", "monitor_name", "monitor_target"}, + ) + prometheus.MustRegister(collector.collectTotal, collector.collectDuration) + }) + return collector +} + +func (c *HertzBeatMetricsCollector) RecordCollectMetrics(job *Job, durationMillis int64, status string) { + if job == nil { + return + } + monitorName := "unknown" + monitorTarget := "unknown" + if job.Metadata != nil { + if v, ok := job.Metadata["instancename"]; ok { + monitorName = v + } + if v, ok := job.Metadata["instancehost"]; ok { + monitorTarget = v + } + } + labels := prometheus.Labels{ + "status": status, + "monitor_type": job.App, + "monitor_id": fmt.Sprintf("%d", job.MonitorId), + "monitor_name": monitorName, + "monitor_target": monitorTarget, + } + c.collectTotal.With(labels).Inc() + c.collectDuration.With(labels).Observe(float64(durationMillis)) +} diff --git a/pkg/collector/common/collect/strategy/strategy_factory.go b/pkg/collector/common/collect/strategy/strategy_factory.go new file mode 100644 index 0000000..ae1aa87 --- /dev/null +++ b/pkg/collector/common/collect/strategy/strategy_factory.go @@ -0,0 +1,39 @@ +package strategy + +import ( + "sync" +) + +// AbstractCollect interface +type AbstractCollect interface { + SupportProtocol() string +} + +// strategy container +var ( + collectStrategy = make(map[string]AbstractCollect) + mu sync.RWMutex +) + +// Register a collect strategy +// Example: registration in init() of each implementation file +// +// func init() { +// Register(&XXXCollect{}) +// } +func Register(collect AbstractCollect) { + + mu.Lock() + defer mu.Unlock() + + collectStrategy[collect.SupportProtocol()] = collect +} + +// Invoke returns the collect strategy for a protocol +func Invoke(protocol string) AbstractCollect { + + mu.RLock() + defer mu.RUnlock() + + return collectStrategy[protocol] +} diff --git a/pkg/collector/common/dispatcher/exporter/.keep b/pkg/collector/common/dispatcher/exporter/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/common/ssh/.keep b/pkg/collector/common/ssh/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/config/config.go b/pkg/collector/config/config.go new file mode 100644 index 0000000..2e251b3 --- /dev/null +++ b/pkg/collector/config/config.go @@ -0,0 +1,119 @@ +package config + +import ( + "context" + "errors" + "os" + + "gopkg.in/yaml.v3" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/internel" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" +) + +const ( + DefaultHertzBeatCollectorName = "hertzbeat-collector" + DefaultHertzBeatCollectorVersion = "0.0.1-DEV" +) + +type HookFunc func(c context.Context, server *internel.CollectorServer) error + +type Loader struct { + cfgPath string + logger logger.Logger + cancel context.CancelFunc + server *internel.CollectorServer + + hook HookFunc + + // todo file watcher + // watcher *fsnotify.Watcher +} + +func New(cfgPath string, server *internel.CollectorServer, f HookFunc) *Loader { + + return &Loader{ + cfgPath: cfgPath, + server: server, + logger: server.Logger.WithName("collector-config-loader"), + hook: f, + } +} + +func (ld *Loader) LoadConfig() (*types.CollectorConfig, error) { + + ld.runHook() + + if ld.cfgPath == "" { + ld.logger.Info("collector-config-loader: path is empty") + return nil, errors.New("collector-config-loader: path is empty") + } + + file, err := os.Open(ld.cfgPath) + if err != nil { + return nil, err + } + defer func(file *os.File) { + err := file.Close() + if err != nil { + ld.logger.Error(err, "close config file failed") + } + }(file) + + var cfg types.CollectorConfig + decoder := yaml.NewDecoder(file) + if err := decoder.Decode(&cfg); err != nil { + ld.logger.Error(err, "decode config file failed") + return nil, err + } + + return &cfg, nil +} + +func (ld *Loader) ValidateConfig(cfg *types.CollectorConfig) error { + + if cfg == nil { + ld.logger.Sugar().Debug("collector-config-loader is nil") + return errors.New("collector-config-loader is nil") + } + + // other check + if cfg.Collector.Info.IP == "" { + ld.logger.Sugar().Debug("collector-config-loader ip is empty") + return errors.New("collector-config-loader ip is empty") + } + + if cfg.Collector.Info.Port == "" { + ld.logger.Sugar().Debug("collector-config-loader port is empty") + return errors.New("collector-config-loader port is empty") + } + + if cfg.Collector.Info.Name == "" { + ld.logger.Sugar().Debug("collector-config-loader: name is empty") + cfg.Collector.Info.Name = DefaultHertzBeatCollectorName + } + + if cfg.Collector.Info.Version == "" { + ld.logger.Sugar().Debug("collector-config-loader version is empty, use default version") + cfg.Collector.Info.Version = DefaultHertzBeatCollectorVersion + } + + return nil +} + +func (r *Loader) runHook() { + + if r.hook == nil { + return + } + + r.logger.Info("running hook") + c, cancel := context.WithCancel(context.TODO()) + r.cancel = cancel + go func(ctx context.Context) { + if err := r.hook(ctx, r.server); err != nil { + r.logger.Error(err, "hook error") + } + }(c) +} diff --git a/pkg/collector/config/config_test.go b/pkg/collector/config/config_test.go new file mode 100644 index 0000000..b16fc79 --- /dev/null +++ b/pkg/collector/config/config_test.go @@ -0,0 +1,45 @@ +package config + +import ( + "os" + "testing" +) + +func TestLoadConfig(t *testing.T) { + + content := `{ + "collector": { + "version": "1.0.0", + "ip": "127.0.0.1" + }, + "dispatcher": {} + }` + dumpfile, err := os.CreateTemp("", "collector_config_*.json") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + defer func(name string) { + err := os.Remove(name) + if err != nil { + t.Logf("failed to remove temp file: %v", err) + } + }(dumpfile.Name()) + if _, err := dumpfile.Write([]byte(content)); err != nil { + t.Fatalf("failed to write to temp file: %v", err) + } + err = dumpfile.Close() + if err != nil { + return + } + + cfg, err := LoadConfig(dumpfile.Name()) + if err != nil { + t.Fatalf("LoadConfig failed: %v", err) + } + if cfg.Collector.Version != "1.0.0" { + t.Errorf("expected version '1.0.0', got '%s'", cfg.Collector.Version) + } + if cfg.Collector.IP != "127.0.0.1" { + t.Errorf("expected ip '127.0.0.1', got '%s'", cfg.Collector.IP) + } +} diff --git a/pkg/collector/internel/server.go b/pkg/collector/internel/server.go new file mode 100644 index 0000000..08c7fb6 --- /dev/null +++ b/pkg/collector/internel/server.go @@ -0,0 +1,48 @@ +package internel + +import ( + "context" + "os" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" +) + +type Run interface { + Start(ctx context.Context) error + Close() error +} + +// CollectorServer HertzBeat Collector Server +type CollectorServer struct { + Logger logger.Logger +} + +func NewCollectorServer() *CollectorServer { + + return &CollectorServer{ + Logger: logger.DefaultLogger(os.Stdout, types.LogLevelInfo), + } +} + +func (s *CollectorServer) Start(ctx context.Context) error { + + s.Logger.Info("hi, starting collector server...") + + // Wait until done + <-ctx.Done() + + return nil +} + +func (s *CollectorServer) Validate() error { + + return nil +} + +// Shutdown the server hook +func (s *CollectorServer) Close() error { + + s.Logger.Info("collector server shutting down... bye!") + return nil +} diff --git a/pkg/collector/kafka/.keep b/pkg/collector/kafka/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/mongodb/.keep b/pkg/collector/mongodb/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/nebulagraph/.keep b/pkg/collector/nebulagraph/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/collector/rocketmq/.keep b/pkg/collector/rocketmq/.keep new file mode 100644 index 0000000..e69de29 diff --git a/pkg/constants/const.go b/pkg/constants/const.go new file mode 100644 index 0000000..8497882 --- /dev/null +++ b/pkg/constants/const.go @@ -0,0 +1,27 @@ +package constants + +// System constants +const ( + KeyWord = "keyword" +) + +// Service related constants +const ( + MongoDbAtlasModel = "mongodb-atlas" + + PostgreSQLUnReachAbleCode = "08001" + ZookeeperApp = "zookeeper" + ZookeeperEnviHeader = "Environment:" +) + +// API constants +const ( + ErrorMsg = "errorMsg" + ResponseTime = "responseTime" + StatusCode = "statusCode" +) + +// Function related constants +const ( + CollectorModule = "collector" +) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go new file mode 100644 index 0000000..43d0bd4 --- /dev/null +++ b/pkg/logger/logger.go @@ -0,0 +1,120 @@ +package logger + +import ( + "io" + "os" + + "github.com/go-logr/logr" + "github.com/go-logr/zapr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" +) + +type Logger struct { + logr.Logger + out io.Writer + logging *types.HertzBeatLogging + sugaredLogger *zap.SugaredLogger +} + +func NewLogger(w io.Writer, logging *types.HertzBeatLogging) Logger { + + logger := initZapLogger(w, logging, logging.Level[types.LogComponentHertzbeatDefault]) + + return Logger{ + Logger: zapr.NewLogger(logger), + out: w, + logging: logging, + sugaredLogger: logger.Sugar(), + } +} + +func FileLogger(file, name string, level types.LogLevel) Logger { + + writer, err := os.OpenFile(file, os.O_WRONLY, 0o666) + if err != nil { + panic(err) + } + + logging := types.DefaultHertzbeatLogging() + logger := initZapLogger(writer, logging, level) + + return Logger{ + Logger: zapr.NewLogger(logger).WithName(name), + logging: logging, + out: writer, + sugaredLogger: logger.Sugar(), + } +} + +func DefaultLogger(out io.Writer, level types.LogLevel) Logger { + + logging := types.DefaultHertzbeatLogging() + logger := initZapLogger(out, logging, level) + + return Logger{ + Logger: zapr.NewLogger(logger), + out: out, + logging: logging, + sugaredLogger: logger.Sugar(), + } +} + +// WithName returns a new Logger instance with the specified name element added +// to the Logger's name. Successive calls with WithName append additional +// suffixes to the Logger's name. It's strongly recommended that name segments +// contain only letters, digits, and hyphens (see the package documentation for +// more information). +func (l Logger) WithName(name string) Logger { + + logLevel := l.logging.Level[types.HertzbeatLogComponent(name)] + logger := initZapLogger(l.out, l.logging, logLevel) + + return Logger{ + Logger: zapr.NewLogger(logger).WithName(name), + logging: l.logging, + out: l.out, + sugaredLogger: logger.Sugar().Named(name), + } +} + +// WithValues returns a new Logger instance with additional key/value pairs. +// See Info for documentation on how key/value pairs work. +func (l Logger) WithValues(keysAndValues ...interface{}) Logger { + + l.Logger = l.Logger.WithValues(keysAndValues...) + return l +} + +// A Sugar wraps the base Logger functionality in a slower, but less +// verbose, API. Any Logger can be converted to a SugaredLogger with its Sugar +// method. +// +// Unlike the Logger, the SugaredLogger doesn't insist on structured logging. +// For each log level, it exposes four methods: +// +// - methods named after the log level for log.Print-style logging +// - methods ending in "w" for loosely-typed structured logging +// - methods ending in "f" for log.Printf-style logging +// - methods ending in "ln" for log.Println-style logging +// +// For example, the methods for InfoLevel are: +// +// Info(...any) Print-style logging +// Infow(...any) Structured logging (read as "info with") +// Infof(string, ...any) Printf-style logging +// Infoln(...any) Println-style logging +func (l Logger) Sugar() *zap.SugaredLogger { + + return l.sugaredLogger +} + +func initZapLogger(w io.Writer, logging *types.HertzBeatLogging, level types.LogLevel) *zap.Logger { + + parseLevel, _ := zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level))) + core := zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel)) + + return zap.New(core, zap.AddCaller()) +} diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go new file mode 100644 index 0000000..a57a242 --- /dev/null +++ b/pkg/logger/logger_test.go @@ -0,0 +1,101 @@ +package logger + +import ( + "errors" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" +) + +func TestZapLogLevel(t *testing.T) { + level, err := zapcore.ParseLevel("warn") + if err != nil { + t.Errorf("ParseLevel error %v", err) + } + zc := zap.NewDevelopmentConfig() + core := zapcore.NewCore(zapcore.NewConsoleEncoder(zc.EncoderConfig), zapcore.AddSync(os.Stdout), zap.NewAtomicLevelAt(level)) + zapLogger := zap.New(core, zap.AddCaller()) + log := zapLogger.Sugar() + log.Info("ok", "k1", "v1") + log.Error(errors.New("new error"), "error") +} + +func TestLogger(t *testing.T) { + logger := NewLogger(os.Stdout, types.DefaultHertzbeatLogging()) + logger.Info("kv msg", "key", "value") + logger.Sugar().Infof("template %s %d", "string", 123) + + logger.WithName(string(types.LogComponentHertzbeatCollector)).WithValues("runner", types.LogComponentHertzbeatCollector).Info("msg", "k", "v") + + defaultLogger := DefaultLogger(os.Stdout, types.LogLevelInfo) + assert.NotNil(t, defaultLogger.logging) + assert.NotNil(t, defaultLogger.sugaredLogger) + + fileLogger := FileLogger("/dev/stderr", "fl-test", types.LogLevelInfo) + assert.NotNil(t, fileLogger.logging) + assert.NotNil(t, fileLogger.sugaredLogger) +} + +func TestLoggerWithName(t *testing.T) { + originalStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + defer func() { + // Restore the original stdout and close the pipe + os.Stdout = originalStdout + err := w.Close() + require.NoError(t, err) + }() + + config := types.DefaultHertzbeatLogging() + config.Level[types.LogComponentHertzbeatCollector] = types.LogLevelDebug + + logger := NewLogger(os.Stdout, config).WithName(string(types.LogComponentHertzbeatCollector)) + logger.Info("info message") + logger.Sugar().Debugf("debug message") + + // Read from the pipe (captured stdout) + outputBytes := make([]byte, 200) + _, err := r.Read(outputBytes) + require.NoError(t, err) + capturedOutput := string(outputBytes) + assert.Contains(t, capturedOutput, string(types.LogComponentHertzbeatCollector)) + assert.Contains(t, capturedOutput, "info message") + assert.Contains(t, capturedOutput, "debug message") +} + +func TestLoggerSugarName(t *testing.T) { + originalStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + defer func() { + // Restore the original stdout and close the pipe + os.Stdout = originalStdout + err := w.Close() + require.NoError(t, err) + }() + + const logName = "loggerName" + + config := types.DefaultHertzbeatLogging() + config.Level[logName] = types.LogLevelDebug + + logger := NewLogger(os.Stdout, config).WithName(logName) + + logger.Sugar().Debugf("debugging message") + + // Read from the pipe (captured stdout) + outputBytes := make([]byte, 200) + _, err := r.Read(outputBytes) + require.NoError(t, err) + capturedOutput := string(outputBytes) + assert.Contains(t, capturedOutput, "debugging message", logName) +} diff --git a/pkg/types/config_types.go b/pkg/types/config_types.go new file mode 100644 index 0000000..d7291fc --- /dev/null +++ b/pkg/types/config_types.go @@ -0,0 +1,22 @@ +package types + +type CollectorConfig struct { + Collector CollectorSection `yaml:"collector"` +} + +type CollectorSection struct { + Info CollectorInfo `yaml:"info"` + Log CollectorLogConfig `yaml:"log"` + // Add Dispatcher if needed +} + +type CollectorInfo struct { + Name string `yaml:"name"` + Version string `yaml:"version"` + IP string `yaml:"ip"` + Port string `yaml:"port"` +} + +type CollectorLogConfig struct { + Level string `yaml:"level"` +} diff --git a/pkg/types/job_types.go b/pkg/types/job_types.go new file mode 100644 index 0000000..fbaa7c8 --- /dev/null +++ b/pkg/types/job_types.go @@ -0,0 +1,35 @@ +package types + +// hertzbeat Collect Job related types + +//type Job struct { +// ID int64 +// TenantID int64 +// MonitorID int64 +// Metadata map[string]string +// Labels map[string]string +// Annotations map[string]string +// Hide bool +// Category string +// App string +// Name map[string]string +// Help map[string]string +// HelpLink map[string]string +// Timestamp int64 +// DefaultInterval int64 +// Intervals *list.List +// IsCyclic bool +// Params []ParamDefine +// Metrics []Metrics +// Configmap []Configmap +// IsSd bool +// PrometheusProxyMode bool +// +// // Internal/temporary fields (not serialized) +// EnvConfigmaps map[string]Configmap +// DispatchTime int64 +// PriorMetrics *list.List +// ResponseDataTemp []MetricsData +// +// mu sync.Mutex +//} diff --git a/pkg/types/logging_types.go b/pkg/types/logging_types.go new file mode 100644 index 0000000..a549586 --- /dev/null +++ b/pkg/types/logging_types.go @@ -0,0 +1,65 @@ +package types + +// hertzbeat logging related types + +type LogLevel string + +const ( + // LogLevelTrace defines the "Trace" logging level. + LogLevelTrace LogLevel = "trace" + + // LogLevelDebug defines the "debug" logging level. + LogLevelDebug LogLevel = "debug" + + // LogLevelInfo defines the "Info" logging level. + LogLevelInfo LogLevel = "info" + + // LogLevelWarn defines the "Warn" logging level. + LogLevelWarn LogLevel = "warn" + + // LogLevelError defines the "Error" logging level. + LogLevelError LogLevel = "error" +) + +type HertzBeatLogging struct { + Level map[HertzbeatLogComponent]LogLevel `json:"level,omitempty"` +} + +type HertzbeatLogComponent string + +const ( + LogComponentHertzbeatDefault HertzbeatLogComponent = "default" + + LogComponentHertzbeatCollector HertzbeatLogComponent = "collector" +) + +func DefaultHertzbeatLogging() *HertzBeatLogging { + + return &HertzBeatLogging{ + Level: map[HertzbeatLogComponent]LogLevel{ + LogComponentHertzbeatDefault: LogLevelInfo, + }, + } +} + +func (logging *HertzBeatLogging) DefaultHertzBeatLoggingLevel(level LogLevel) LogLevel { + + if level != "" { + return level + } + + if logging.Level[LogComponentHertzbeatDefault] != "" { + + return logging.Level[LogComponentHertzbeatDefault] + } + + return LogLevelInfo +} + +func (logging *HertzBeatLogging) SetHertzBeatLoggingDefaults() { + + if logging != nil && logging.Level != nil && logging.Level[LogComponentHertzbeatDefault] == "" { + + logging.Level[LogComponentHertzbeatDefault] = LogLevelInfo + } +} diff --git a/pkg/util/.keep b/pkg/util/.keep new file mode 100644 index 0000000..e69de29 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
