This is an automated email from the ASF dual-hosted git repository. gaoxingcun pushed a commit to branch reconfiguration_scheduling in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 4116025c2b3bbf0e67c406c26b4e7a3ae5c37d9c Author: yuluo-yx <[email protected]> AuthorDate: Sun Aug 31 21:34:41 2025 +0800 feat: add somethings Signed-off-by: yuluo-yx <[email protected]> --- .gitignore | 2 + Makefile | 42 ++- api/message.pb.go | 319 ++++++++++++++++++++++ api/message.proto | 64 +++++ cmd/collector/hertzbeat-collector.yaml | 3 +- cmd/collector/main.go | 8 +- pkg/banner/banner.txt | 2 +- pkg/banner/embed.go | 2 + pkg/collector/bootstrap.go | 4 +- pkg/collector/common/transport/transport.go | 13 + pkg/collector/config/config.go | 13 +- pkg/collector/internel/server.go | 18 +- pkg/types/config_types.go | 7 +- pkg/types/{ => job}/job_types.go | 2 +- pkg/types/job/protocol/common_request_protocol.go | 16 ++ pkg/types/job/protocol/consul_sd_protocol.go | 33 +++ pkg/types/job/protocol/zookeeper_sd_protocol.go | 41 +++ 17 files changed, 563 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index ed854b5..987a170 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,8 @@ lintconfig.gen.json *.orig +bin/* + LICENSES.txt **/var/run/secrets/ diff --git a/Makefile b/Makefile index 7f69aa8..795a6a2 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + .PHONY: fmt: go fmt ./... @@ -14,10 +31,31 @@ vet: dev: go run ./cmd/main.go -.PHONY: +.PHONY: build +# build build: - go build -o bin/app ./cmd/main.go + @version=$$(cat VERSION); \ + mkdir -p bin/ && go build -ldflags "-X main.Version=$(VERSION)" -o ./bin/ ./... .PHONY: test: go test -v ./... + +.PHONY: init +# init env +init: + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest + go install github.com/go-kratos/kratos/cmd/protoc-gen-go-http/v2@latest + go install github.com/google/gnostic/cmd/protoc-gen-openapi@latest + +.PHONY: api +# generate api proto +api: API_PROTO_FILES := $(wildcard api/*.proto) +api: + protoc --proto_path=./api \ + --go_out=paths=source_relative:./api \ + --go-http_out=paths=source_relative:./api \ + --go-grpc_out=paths=source_relative:./api \ + --openapi_out=fq_schema_naming=true,default_response=false:. \ + $(API_PROTO_FILES) diff --git a/api/message.pb.go b/api/message.pb.go new file mode 100644 index 0000000..887f57a --- /dev/null +++ b/api/message.pb.go @@ -0,0 +1,319 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.8 +// protoc v5.29.3 +// source: message.proto + +package message + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MessageType int32 + +const ( + // heartbeat message + MessageType_HEARTBEAT MessageType = 0 + // collector go online to master message + MessageType_GO_ONLINE MessageType = 1 + // collector go offline to master message + MessageType_GO_OFFLINE MessageType = 2 + // collector go close to master + MessageType_GO_CLOSE MessageType = 3 + // issue cyclic collect task + MessageType_ISSUE_CYCLIC_TASK MessageType = 4 + // delete cyclic collect task + MessageType_DELETE_CYCLIC_TASK MessageType = 5 + // issue one-time collect task + MessageType_ISSUE_ONE_TIME_TASK MessageType = 6 + // response one-time collect data + MessageType_RESPONSE_ONE_TIME_TASK_DATA MessageType = 7 + // response cyclic collect data + MessageType_RESPONSE_CYCLIC_TASK_DATA MessageType = 8 + // response cyclic service discovery data + MessageType_RESPONSE_CYCLIC_TASK_SD_DATA MessageType = 9 +) + +// Enum value maps for MessageType. +var ( + MessageType_name = map[int32]string{ + 0: "HEARTBEAT", + 1: "GO_ONLINE", + 2: "GO_OFFLINE", + 3: "GO_CLOSE", + 4: "ISSUE_CYCLIC_TASK", + 5: "DELETE_CYCLIC_TASK", + 6: "ISSUE_ONE_TIME_TASK", + 7: "RESPONSE_ONE_TIME_TASK_DATA", + 8: "RESPONSE_CYCLIC_TASK_DATA", + 9: "RESPONSE_CYCLIC_TASK_SD_DATA", + } + MessageType_value = map[string]int32{ + "HEARTBEAT": 0, + "GO_ONLINE": 1, + "GO_OFFLINE": 2, + "GO_CLOSE": 3, + "ISSUE_CYCLIC_TASK": 4, + "DELETE_CYCLIC_TASK": 5, + "ISSUE_ONE_TIME_TASK": 6, + "RESPONSE_ONE_TIME_TASK_DATA": 7, + "RESPONSE_CYCLIC_TASK_DATA": 8, + "RESPONSE_CYCLIC_TASK_SD_DATA": 9, + } +) + +func (x MessageType) Enum() *MessageType { + p := new(MessageType) + *p = x + return p +} + +func (x MessageType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (MessageType) Descriptor() protoreflect.EnumDescriptor { + return file_message_proto_enumTypes[0].Descriptor() +} + +func (MessageType) Type() protoreflect.EnumType { + return &file_message_proto_enumTypes[0] +} + +func (x MessageType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use MessageType.Descriptor instead. +func (MessageType) EnumDescriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{0} +} + +type Direction int32 + +const ( + // request message + Direction_REQUEST Direction = 0 + // request response + Direction_RESPONSE Direction = 1 +) + +// Enum value maps for Direction. +var ( + Direction_name = map[int32]string{ + 0: "REQUEST", + 1: "RESPONSE", + } + Direction_value = map[string]int32{ + "REQUEST": 0, + "RESPONSE": 1, + } +) + +func (x Direction) Enum() *Direction { + p := new(Direction) + *p = x + return p +} + +func (x Direction) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Direction) Descriptor() protoreflect.EnumDescriptor { + return file_message_proto_enumTypes[1].Descriptor() +} + +func (Direction) Type() protoreflect.EnumType { + return &file_message_proto_enumTypes[1] +} + +func (x Direction) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Direction.Descriptor instead. +func (Direction) EnumDescriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{1} +} + +type Message struct { + state protoimpl.MessageState `protogen:"open.v1"` + // collector identity + Identity string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"` + // message direction + Direction Direction `protobuf:"varint,2,opt,name=direction,proto3,enum=hertzbeat.apache.org.api.message.Direction" json:"direction,omitempty"` + // message type + Type MessageType `protobuf:"varint,3,opt,name=type,proto3,enum=hertzbeat.apache.org.api.message.MessageType" json:"type,omitempty"` + // message content + Msg []byte `protobuf:"bytes,4,opt,name=msg,proto3" json:"msg,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Message) Reset() { + *x = Message{} + mi := &file_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetIdentity() string { + if x != nil { + return x.Identity + } + return "" +} + +func (x *Message) GetDirection() Direction { + if x != nil { + return x.Direction + } + return Direction_REQUEST +} + +func (x *Message) GetType() MessageType { + if x != nil { + return x.Type + } + return MessageType_HEARTBEAT +} + +func (x *Message) GetMsg() []byte { + if x != nil { + return x.Msg + } + return nil +} + +var File_message_proto protoreflect.FileDescriptor + +const file_message_proto_rawDesc = "" + + "\n" + + "\rmessage.proto\x12 hertzbeat.apache.org.api.message\"\xc5\x01\n" + + "\aMessage\x12\x1a\n" + + "\bidentity\x18\x01 \x01(\tR\bidentity\x12I\n" + + "\tdirection\x18\x02 \x01(\x0e2+.hertzbeat.apache.org.api.message.DirectionR\tdirection\x12A\n" + + "\x04type\x18\x03 \x01(\x0e2-.hertzbeat.apache.org.api.message.MessageTypeR\x04type\x12\x10\n" + + "\x03msg\x18\x04 \x01(\fR\x03msg*\xf3\x01\n" + + "\vMessageType\x12\r\n" + + "\tHEARTBEAT\x10\x00\x12\r\n" + + "\tGO_ONLINE\x10\x01\x12\x0e\n" + + "\n" + + "GO_OFFLINE\x10\x02\x12\f\n" + + "\bGO_CLOSE\x10\x03\x12\x15\n" + + "\x11ISSUE_CYCLIC_TASK\x10\x04\x12\x16\n" + + "\x12DELETE_CYCLIC_TASK\x10\x05\x12\x17\n" + + "\x13ISSUE_ONE_TIME_TASK\x10\x06\x12\x1f\n" + + "\x1bRESPONSE_ONE_TIME_TASK_DATA\x10\a\x12\x1d\n" + + "\x19RESPONSE_CYCLIC_TASK_DATA\x10\b\x12 \n" + + "\x1cRESPONSE_CYCLIC_TASK_SD_DATA\x10\t*&\n" + + "\tDirection\x12\v\n" + + "\aREQUEST\x10\x00\x12\f\n" + + "\bRESPONSE\x10\x01BL\n" + + " org.apache.hertzbeat.api.messageZ(hertzbeat.apache.org/api/message;messageb\x06proto3" + +var ( + file_message_proto_rawDescOnce sync.Once + file_message_proto_rawDescData []byte +) + +func file_message_proto_rawDescGZIP() []byte { + file_message_proto_rawDescOnce.Do(func() { + file_message_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_message_proto_rawDesc), len(file_message_proto_rawDesc))) + }) + return file_message_proto_rawDescData +} + +var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_message_proto_goTypes = []any{ + (MessageType)(0), // 0: hertzbeat.apache.org.api.message.MessageType + (Direction)(0), // 1: hertzbeat.apache.org.api.message.Direction + (*Message)(nil), // 2: hertzbeat.apache.org.api.message.Message +} +var file_message_proto_depIdxs = []int32{ + 1, // 0: hertzbeat.apache.org.api.message.Message.direction:type_name -> hertzbeat.apache.org.api.message.Direction + 0, // 1: hertzbeat.apache.org.api.message.Message.type:type_name -> hertzbeat.apache.org.api.message.MessageType + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_message_proto_init() } +func file_message_proto_init() { + if File_message_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_message_proto_rawDesc), len(file_message_proto_rawDesc)), + NumEnums: 2, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_message_proto_goTypes, + DependencyIndexes: file_message_proto_depIdxs, + EnumInfos: file_message_proto_enumTypes, + MessageInfos: file_message_proto_msgTypes, + }.Build() + File_message_proto = out.File + file_message_proto_goTypes = nil + file_message_proto_depIdxs = nil +} diff --git a/api/message.proto b/api/message.proto new file mode 100644 index 0000000..6dbe282 --- /dev/null +++ b/api/message.proto @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package hertzbeat.apache.org.api.message; + +option go_package = "hertzbeat.apache.org/api/message;message"; +option java_package = "org.apache.hertzbeat.api.message"; + +message Message { + // collector identity + string identity = 1; + // message direction + Direction direction = 2; + // message type + MessageType type = 3; + // message content + bytes msg = 4; +} + +enum MessageType { + // heartbeat message + HEARTBEAT = 0; + // collector go online to master message + GO_ONLINE = 1; + // collector go offline to master message + GO_OFFLINE = 2; + // collector go close to master + GO_CLOSE = 3; + // issue cyclic collect task + ISSUE_CYCLIC_TASK = 4; + // delete cyclic collect task + DELETE_CYCLIC_TASK = 5; + // issue one-time collect task + ISSUE_ONE_TIME_TASK = 6; + // response one-time collect data + RESPONSE_ONE_TIME_TASK_DATA = 7; + // response cyclic collect data + RESPONSE_CYCLIC_TASK_DATA = 8; + // response cyclic service discovery data + RESPONSE_CYCLIC_TASK_SD_DATA = 9; +} + +enum Direction { + // request message + REQUEST = 0; + // request response + RESPONSE = 1; +} diff --git a/cmd/collector/hertzbeat-collector.yaml b/cmd/collector/hertzbeat-collector.yaml index 9ef86d1..77c1b1e 100644 --- a/cmd/collector/hertzbeat-collector.yaml +++ b/cmd/collector/hertzbeat-collector.yaml @@ -1,7 +1,6 @@ collector: info: - Name: hertzbeat-collector-go - version: 0.0.1-DEV + name: hertzbeat-collector-go ip: 127.0.0.1 port: 8080 diff --git a/cmd/collector/main.go b/cmd/collector/main.go index ebf420f..6481cb4 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -7,17 +7,19 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" ) +// go build -ldflags "-X main.Version=x.y.z" var ( - conf string + ConfPath string + Version string ) func init() { - flag.StringVar(&conf, "conf", "hertzbeat-collector.yaml", "path to config file") + flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to config file, eg: -conf ./hertzbeat-collector.yaml") } func main() { - if err := collector.Bootstrap(conf); err != nil { + if err := collector.Bootstrap(ConfPath, Version); err != nil { os.Exit(1) } diff --git a/pkg/banner/banner.txt b/pkg/banner/banner.txt index 51c6457..3a3161d 100644 --- a/pkg/banner/banner.txt +++ b/pkg/banner/banner.txt @@ -1,5 +1,5 @@ _ _ _ ____ _ | | | | ___ _ __| |_ ___| __ ) ___ __ _| |_ | |_| |/ _ \ '__| __|_ / _ \ / _ \/ _` | __| - | _ | __/ | | |_ / /| |_) | __/ (_| | |_ Name: {{ .CollectorName }} Port: {{ .ServerPort }} Pid: {{ .Pid }} + | _ | __/ | | |_ / /| |_) | __/ (_| | |_ Name: {{ .CollectorName }} Port: {{ .ServerPort }} Pid: {{ .Pid }} Version: {{ .Version }} |_| |_|\___|_| \__/___|____/ \___|\__,_|\__| https://hertzbeat.apache.org/ diff --git a/pkg/banner/embed.go b/pkg/banner/embed.go index d3ad750..d0e6c9d 100644 --- a/pkg/banner/embed.go +++ b/pkg/banner/embed.go @@ -24,6 +24,7 @@ type bannerVars struct { CollectorName string ServerPort string Pid string + Version string } func (b *Banner) PrintBanner(appName, port string) error { @@ -43,6 +44,7 @@ func (b *Banner) PrintBanner(appName, port string) error { CollectorName: appName, ServerPort: port, Pid: strconv.Itoa(os.Getpid()), + Version: b.server.Version, } err = tmpl.Execute(os.Stdout, vars) diff --git a/pkg/collector/bootstrap.go b/pkg/collector/bootstrap.go index 9b46a90..fec66c7 100644 --- a/pkg/collector/bootstrap.go +++ b/pkg/collector/bootstrap.go @@ -11,10 +11,10 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/internel" ) -func Bootstrap(confPath string) error { +func Bootstrap(confPath, version string) error { // Init collector server - server := internel.NewCollectorServer() + server := internel.NewCollectorServer(version) server.Logger.Sugar().Debug("测试日志级别") diff --git a/pkg/collector/common/transport/transport.go b/pkg/collector/common/transport/transport.go new file mode 100644 index 0000000..1deb96d --- /dev/null +++ b/pkg/collector/common/transport/transport.go @@ -0,0 +1,13 @@ +package transport + +type RemotingService interface { + Start() error + Shutdown() error + isStart() error +} + +type TransportClient interface { + RemotingService + + // todo add +} diff --git a/pkg/collector/config/config.go b/pkg/collector/config/config.go index 2e251b3..1a1c41b 100644 --- a/pkg/collector/config/config.go +++ b/pkg/collector/config/config.go @@ -13,8 +13,7 @@ import ( ) const ( - DefaultHertzBeatCollectorName = "hertzbeat-collector" - DefaultHertzBeatCollectorVersion = "0.0.1-DEV" + DefaultHertzBeatCollectorName = "hertzbeat-collector" ) type HookFunc func(c context.Context, server *internel.CollectorServer) error @@ -50,6 +49,11 @@ func (ld *Loader) LoadConfig() (*types.CollectorConfig, error) { return nil, errors.New("collector-config-loader: path is empty") } + if _, err := os.Stat(ld.cfgPath); os.IsNotExist(err) { + ld.logger.Error(err, "collector-config-loader: file not exist", "path", ld.cfgPath) + return nil, err + } + file, err := os.Open(ld.cfgPath) if err != nil { return nil, err @@ -94,11 +98,6 @@ func (ld *Loader) ValidateConfig(cfg *types.CollectorConfig) error { cfg.Collector.Info.Name = DefaultHertzBeatCollectorName } - if cfg.Collector.Info.Version == "" { - ld.logger.Sugar().Debug("collector-config-loader version is empty, use default version") - cfg.Collector.Info.Version = DefaultHertzBeatCollectorVersion - } - return nil } diff --git a/pkg/collector/internel/server.go b/pkg/collector/internel/server.go index 08c7fb6..7bb79fd 100644 --- a/pkg/collector/internel/server.go +++ b/pkg/collector/internel/server.go @@ -8,6 +8,10 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" ) +const ( + DefaultHertzBeatCollectorVersion = "0.0.1-DEV" +) + type Run interface { Start(ctx context.Context) error Close() error @@ -15,13 +19,19 @@ type Run interface { // CollectorServer HertzBeat Collector Server type CollectorServer struct { - Logger logger.Logger + Version string + Logger logger.Logger } -func NewCollectorServer() *CollectorServer { +func NewCollectorServer(version string) *CollectorServer { + + if version == "" { + version = DefaultHertzBeatCollectorVersion + } return &CollectorServer{ - Logger: logger.DefaultLogger(os.Stdout, types.LogLevelInfo), + Version: version, + Logger: logger.DefaultLogger(os.Stdout, types.LogLevelInfo), } } @@ -40,7 +50,7 @@ func (s *CollectorServer) Validate() error { return nil } -// Shutdown the server hook +// Close Shutdown the server hook func (s *CollectorServer) Close() error { s.Logger.Info("collector server shutting down... bye!") diff --git a/pkg/types/config_types.go b/pkg/types/config_types.go index d7291fc..de0adf3 100644 --- a/pkg/types/config_types.go +++ b/pkg/types/config_types.go @@ -11,10 +11,9 @@ type CollectorSection struct { } type CollectorInfo struct { - Name string `yaml:"name"` - Version string `yaml:"version"` - IP string `yaml:"ip"` - Port string `yaml:"port"` + Name string `yaml:"name"` + IP string `yaml:"ip"` + Port string `yaml:"port"` } type CollectorLogConfig struct { diff --git a/pkg/types/job_types.go b/pkg/types/job/job_types.go similarity index 98% rename from pkg/types/job_types.go rename to pkg/types/job/job_types.go index fbaa7c8..f20bc23 100644 --- a/pkg/types/job_types.go +++ b/pkg/types/job/job_types.go @@ -1,4 +1,4 @@ -package types +package job // hertzbeat Collect Job related types diff --git a/pkg/types/job/protocol/common_request_protocol.go b/pkg/types/job/protocol/common_request_protocol.go new file mode 100644 index 0000000..6c00a23 --- /dev/null +++ b/pkg/types/job/protocol/common_request_protocol.go @@ -0,0 +1,16 @@ +package protocol + +import ( + "errors" +) + +var ( + ErrorInvalidHost = errors.New("invalid host") + ErrorInvalidPort = errors.New("invalid port") + ErrorInvalidURL = errors.New("invalid URL") +) + +type CommonRequestProtocol interface { + SetHost(host string) error + SetPort(port int) error +} diff --git a/pkg/types/job/protocol/consul_sd_protocol.go b/pkg/types/job/protocol/consul_sd_protocol.go new file mode 100644 index 0000000..b901404 --- /dev/null +++ b/pkg/types/job/protocol/consul_sd_protocol.go @@ -0,0 +1,33 @@ +package protocol + +import "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + +type ConsulSdProtocol struct { + Host string + Port string + + logger logger.Logger +} + +func NewConsulSdProtocol(host, port string, logger logger.Logger) *ConsulSdProtocol { + + return &ConsulSdProtocol{ + Host: host, + Port: port, + logger: logger, + } +} + +func (cp *ConsulSdProtocol) IsInvalid() error { + + if cp.Host == "" { + cp.logger.Error(ErrorInvalidHost, "consul sd protocol host is empty") + return ErrorInvalidHost + } + if cp.Port == "" { + cp.logger.Error(ErrorInvalidPort, "consul sd protocol port is empty") + return ErrorInvalidPort + } + + return nil +} diff --git a/pkg/types/job/protocol/zookeeper_sd_protocol.go b/pkg/types/job/protocol/zookeeper_sd_protocol.go new file mode 100644 index 0000000..3e3cef2 --- /dev/null +++ b/pkg/types/job/protocol/zookeeper_sd_protocol.go @@ -0,0 +1,41 @@ +package protocol + +import ( + "errors" + + "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" +) + +var ( + ErrorInvalidPathPrefix = errors.New("invalid path prefix") +) + +type ZookeeperSdProtocol struct { + URL string + PathPrefix string + + logger logger.Logger +} + +func NewZookeeperSdProtocol(url, pathPrefix string, logger logger.Logger) *ZookeeperSdProtocol { + + return &ZookeeperSdProtocol{ + URL: url, + PathPrefix: pathPrefix, + logger: logger, + } +} + +func (zp *ZookeeperSdProtocol) IsInvalid() error { + + if zp.URL == "" { + zp.logger.Error(ErrorInvalidURL, "zk sd protocol host is empty") + return ErrorInvalidURL + } + if zp.PathPrefix == "" { + zp.logger.Error(ErrorInvalidPathPrefix, "zk sd protocol port is empty") + return ErrorInvalidPathPrefix + } + + return nil +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
