This is an automated email from the ASF dual-hosted git repository.
robocanic pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
The following commit(s) were added to refs/heads/develop by this push:
new 9de1ee0e feat: add leader election for Discovery, Engine,
Counter.(#1423)
9de1ee0e is described below
commit 9de1ee0e916b84be0165adb3789fa273d20a914a
Author: ThunGuo <[email protected]>
AuthorDate: Sun Mar 29 09:26:05 2026 +0800
feat: add leader election for Discovery, Engine, Counter.(#1423)
* feat: add leader election for Discovery and Engine components
* fix: separate renew/acquire SQL and stop informers on demotion
* feat: add leader election for Counter component
* fix: resolve copilot review suggestion
* fix: GormStore.Pool return value
---
go.mod | 28 ++--
go.sum | 27 ----
pkg/console/counter/component.go | 105 +++++++++++++--
pkg/console/counter/manager.go | 5 +
pkg/core/discovery/component.go | 99 ++++++++++++--
pkg/core/engine/component.go | 87 +++++++++++-
pkg/core/leader/db_source.go | 28 ++++
pkg/core/leader/leader.go | 285 +++++++++++++++++++++++++++++++++++++++
pkg/core/leader/leader_test.go | 202 +++++++++++++++++++++++++++
pkg/core/leader/model.go | 36 +++++
pkg/core/store/component.go | 42 ++++++
pkg/store/dbcommon/gorm_store.go | 7 +
12 files changed, 878 insertions(+), 73 deletions(-)
diff --git a/go.mod b/go.mod
index 9d90eed5..cd7ae39d 100644
--- a/go.mod
+++ b/go.mod
@@ -21,12 +21,8 @@ toolchain go1.24.11
require (
dubbo.apache.org/dubbo-go/v3 v3.3.0
- github.com/Masterminds/semver/v3 v3.2.1
- github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
- github.com/dubbogo/gost v1.14.0
+ github.com/dubbogo/go-zookeeper v1.0.4
github.com/duke-git/lancet/v2 v2.3.6
- github.com/emicklei/go-restful/v3 v3.12.2
- github.com/envoyproxy/go-control-plane v0.13.4
github.com/envoyproxy/go-control-plane/envoy v1.32.4
github.com/fullstorydev/grpcurl v1.9.1
github.com/gin-contrib/sessions v1.0.4
@@ -35,27 +31,18 @@ require (
github.com/go-co-op/gocron v1.9.0
github.com/go-logr/logr v1.4.2
github.com/go-logr/zapr v1.3.0
- github.com/goburrow/cache v0.1.4
github.com/golang/protobuf v1.5.4
- github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
- github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69
github.com/jhump/protoreflect v1.16.0
- github.com/kelseyhightower/envconfig v1.4.0
- github.com/mitchellh/mapstructure v1.5.0
github.com/nacos-group/nacos-sdk-go/v2 v2.3.5
github.com/onsi/ginkgo/v2 v2.22.1
github.com/onsi/gomega v1.36.2
github.com/pkg/errors v0.9.1
- github.com/prometheus/client_golang v1.20.5
- github.com/slok/go-http-metrics v0.11.0
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.10.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
- golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
- golang.org/x/sys v0.38.0
golang.org/x/text v0.31.0
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
@@ -68,7 +55,6 @@ require (
k8s.io/apimachinery v0.34.2
k8s.io/client-go v0.34.2
k8s.io/klog/v2 v2.130.1
- moul.io/zapgorm2 v1.3.0
sigs.k8s.io/controller-runtime v0.19.4
sigs.k8s.io/yaml v1.6.0
)
@@ -110,8 +96,8 @@ require (
)
require (
- cel.dev/expr v0.23.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
+ github.com/BurntSushi/toml v1.3.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protocompile v0.10.0 // indirect
github.com/bytedance/sonic v1.13.2 // indirect
@@ -120,8 +106,8 @@ require (
github.com/cloudwego/base64x v0.1.5 // indirect
github.com/cncf/xds/go v0.0.0-20250326154945-ae57f3c0d45f // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc //
indirect
- github.com/dubbogo/go-zookeeper v1.0.4 // indirect
- github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 // indirect
+ github.com/dubbogo/gost v1.14.0 // indirect
+ github.com/emicklei/go-restful/v3 v3.12.2 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
@@ -138,6 +124,7 @@ require (
github.com/goccy/go-json v0.10.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
+ github.com/google/go-cmp v0.7.0 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
github.com/gorilla/context v1.1.2 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
@@ -162,6 +149,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10
// indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 //
indirect
+ github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.57.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
@@ -175,17 +163,17 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/arch v0.16.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
+ golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/oauth2 v0.28.0 // indirect
golang.org/x/sync v0.18.0 // indirect
+ golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.38.0 // indirect
- google.golang.org/genproto/googleapis/api
v0.0.0-20250324211829-b45e905df463 // indirect
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250324211829-b45e905df463 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
- gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
diff --git a/go.sum b/go.sum
index 265bc3a8..d2959599 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,3 @@
-cel.dev/expr v0.23.0 h1:wUb94w6OYQS4uXraxo9U+wUAs9jT47Xvl4iPgAwM2ss=
-cel.dev/expr v0.23.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
cloud.google.com/go v0.26.0/go.mod
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod
h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@@ -42,8 +40,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.3.2
h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod
h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod
h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
-github.com/Masterminds/semver/v3 v3.2.1
h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
-github.com/Masterminds/semver/v3 v3.2.1/go.mod
h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/OneOfOne/xxhash v1.2.2/go.mod
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Workiva/go-datastructures v1.0.52
h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod
h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
@@ -118,8 +114,6 @@ github.com/antihax/optional v1.0.0/go.mod
h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
-github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
-github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod
h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/benbjohnson/clock v1.1.0/go.mod
h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -196,12 +190,8 @@ github.com/envoyproxy/go-control-plane
v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/go-control-plane v0.9.4/go.mod
h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane
v0.9.9-0.20201210154907-fd9021fe5dad/go.mod
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane
v0.10.2-0.20220325020618-49ff273808a1/go.mod
h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
-github.com/envoyproxy/go-control-plane v0.13.4
h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
-github.com/envoyproxy/go-control-plane v0.13.4/go.mod
h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
github.com/envoyproxy/go-control-plane/envoy v1.32.4
h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod
h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
-github.com/envoyproxy/go-control-plane/ratelimit v0.1.0
h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
-github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod
h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.2.1
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
@@ -263,8 +253,6 @@ github.com/go-sql-driver/mysql v1.8.1/go.mod
h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqw
github.com/go-stack/stack v1.8.0/go.mod
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig/v3 v3.0.0
h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod
h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
-github.com/goburrow/cache v0.1.4
h1:As4KzO3hgmzPlnaMniZU9+VmoNYseUhuELbxy9mRBfw=
-github.com/goburrow/cache v0.1.4/go.mod
h1:cDFesZDnIlrHoNlMYqqMpCRawuXulgx+y7mXU8HZ+/c=
github.com/goccy/go-json v0.10.5
h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod
h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.3/go.mod
h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@@ -380,8 +368,6 @@ github.com/hashicorp/logutils v1.0.0/go.mod
h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod
h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod
h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod
h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
-github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69
h1:umaj0TCQ9lWUUKy2DxAhEzPbwd0jnxiw1EI2z3FiILM=
-github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69/go.mod
h1:zdLK9ilQRSMjSeLKoZ4BqUfBT7jswTGF8zRlKEsiRXA=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod
h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inconshreveable/mousetrap v1.1.0
h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -400,7 +386,6 @@ github.com/jinzhu/copier v0.3.5
h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod
h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jinzhu/inflection v1.0.0
h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod
h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
-github.com/jinzhu/now v1.1.4/go.mod
h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod
h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod
h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
@@ -427,8 +412,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod
h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod
h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod
h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/pp v3.0.1+incompatible/go.mod
h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
-github.com/kelseyhightower/envconfig v1.4.0
h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
-github.com/kelseyhightower/envconfig v1.4.0/go.mod
h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod
h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod
h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod
h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -473,8 +456,6 @@ github.com/mitchellh/gox v0.4.0/go.mod
h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod
h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod
h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod
h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
-github.com/mitchellh/mapstructure v1.5.0
h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
-github.com/mitchellh/mapstructure v1.5.0/go.mod
h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -568,8 +549,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/sirupsen/logrus v1.4.2/go.mod
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod
h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0/go.mod
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
-github.com/slok/go-http-metrics v0.11.0
h1:ABJUpekCZSkQT1wQrFvS4kGbhea/w6ndFJaWJeh3zL0=
-github.com/slok/go-http-metrics v0.11.0/go.mod
h1:ZGKeYG1ET6TEJpQx18BqAJAvxw9jBAZXCHU7bWQqqAc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.1.0/go.mod
h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v1.6.4/go.mod
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
@@ -669,7 +648,6 @@ go.uber.org/goleak v1.3.0/go.mod
h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.5.0/go.mod
h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0/go.mod
h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
-go.uber.org/multierr v1.7.0/go.mod
h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod
h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod
h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
@@ -1023,8 +1001,6 @@ google.golang.org/genproto
v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod
h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod
h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod
h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
-google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463
h1:hE3bRWtU6uceqlh4fhrSnUyjKHMKB9KrTLLG+bc0ddM=
-google.golang.org/genproto/googleapis/api
v0.0.0-20250324211829-b45e905df463/go.mod
h1:U90ffi8eUL9MwPcrJylN5+Mk2v3vuPDptd5yyNUiRR8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463
h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g=
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250324211829-b45e905df463/go.mod
h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.19.0/go.mod
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@@ -1102,7 +1078,6 @@ gorm.io/driver/postgres v1.6.0
h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4=
gorm.io/driver/postgres v1.6.0/go.mod
h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo=
gorm.io/driver/sqlite v1.5.7 h1:8NvsrhP0ifM7LX9G4zPB97NwovUakUxc+2V2uuf3Z1I=
gorm.io/driver/sqlite v1.5.7/go.mod
h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
-gorm.io/gorm v1.23.6/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs=
gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
@@ -1124,8 +1099,6 @@ k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b
h1:MloQ9/bdJyIu9lb1PzujOP
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b/go.mod
h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts=
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y=
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod
h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
-moul.io/zapgorm2 v1.3.0 h1:+CzUTMIcnafd0d/BvBce8T4uPn6DQnpIrz64cyixlkk=
-moul.io/zapgorm2 v1.3.0/go.mod h1:nPVy6U9goFKHR4s+zfSo1xVFaoU7Qgd5DoCdOfzoCqs=
nullprogram.com/x/optparse v1.0.0/go.mod
h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/binaryregexp v0.2.0/go.mod
h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go
index 8c39cb3a..710518c1 100644
--- a/pkg/console/counter/component.go
+++ b/pkg/console/counter/component.go
@@ -18,10 +18,14 @@
package counter
import (
+ "context"
"fmt"
"math"
+ "sync/atomic"
+ storecfg "github.com/apache/dubbo-admin/pkg/config/store"
"github.com/apache/dubbo-admin/pkg/core/events"
+ "github.com/apache/dubbo-admin/pkg/core/leader"
"github.com/apache/dubbo-admin/pkg/core/logger"
meshresource
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
@@ -50,7 +54,13 @@ func (c *managerComponent) RequiredDependencies()
[]runtime.ComponentType {
}
type managerComponent struct {
- manager CounterManager
+ manager CounterManager
+ leaderElection *leader.LeaderElection
+ needsLeaderElection bool
+ isLeader atomic.Bool
+ bound atomic.Bool
+ storeRouter store.Router
+ bus events.EventBus
}
func (c *managerComponent) Type() runtime.ComponentType {
@@ -61,13 +71,45 @@ func (c *managerComponent) Order() int {
return math.MaxInt - 1
}
-func (c *managerComponent) Init(runtime.BuilderContext) error {
+func (c *managerComponent) Init(ctx runtime.BuilderContext) error {
mgr := NewCounterManager()
c.manager = mgr
+
+ // Memory store runs single-replica; leader election is not needed.
+ if ctx.Config().Store.Type == storecfg.Memory {
+ return nil
+ }
+
+ storeComponent, err := ctx.GetActivatedComponent(runtime.ResourceStore)
+ if err != nil {
+ logger.Warnf("counter: failed to get ResourceStore component,
skipping leader election: %v", err)
+ return nil
+ }
+ dbSrc, ok := storeComponent.(leader.DBSource)
+ if !ok {
+ return nil
+ }
+ db, hasDB := dbSrc.GetDB()
+ if !hasDB {
+ return nil
+ }
+ holderID, err := leader.GenerateHolderID()
+ if err != nil {
+ logger.Warnf("counter: failed to generate holder ID, skipping
leader election: %v", err)
+ return nil
+ }
+ le := leader.NewLeaderElection(db, string(ComponentType), holderID)
+ if err := le.EnsureTable(); err != nil {
+ logger.Warnf("counter: failed to ensure leader lease table:
%v", err)
+ return nil
+ }
+ c.leaderElection = le
+ c.needsLeaderElection = true
+ logger.Infof("counter: leader election initialized (holder: %s)",
holderID)
return nil
}
-func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
+func (c *managerComponent) Start(rt runtime.Runtime, ch <-chan struct{}) error
{
storeComponent, err := rt.GetComponent(runtime.ResourceStore)
if err != nil {
return err
@@ -76,10 +118,7 @@ func (c *managerComponent) Start(rt runtime.Runtime, _
<-chan struct{}) error {
if !ok {
return fmt.Errorf("component %s does not implement
store.Router", runtime.ResourceStore)
}
-
- if err := c.initializeCountsFromStore(storeRouter); err != nil {
- logger.Warnf("Failed to initialize counter manager from store:
%v", err)
- }
+ c.storeRouter = storeRouter
component, err := rt.GetComponent(runtime.EventBus)
if err != nil {
@@ -89,7 +128,57 @@ func (c *managerComponent) Start(rt runtime.Runtime, _
<-chan struct{}) error {
if !ok {
return fmt.Errorf("component %s does not implement
events.EventBus", runtime.EventBus)
}
- return c.manager.Bind(bus)
+ c.bus = bus
+
+ if !c.needsLeaderElection {
+ return c.startBusinessLogic()
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ <-ch
+ cancel()
+ }()
+
+ c.leaderElection.RunLeaderElection(ctx, ch,
+ func() { // onStartLeading
+ logger.Infof("counter: became leader, starting business
logic")
+ c.isLeader.Store(true)
+ if err := c.startBusinessLogic(); err != nil {
+ logger.Errorf("counter: failed to start
business logic: %v", err)
+ }
+ },
+ func() { // onStopLeading
+ logger.Warnf("counter: lost leadership, resetting
counters")
+ c.isLeader.Store(false)
+ c.manager.Reset()
+ },
+ )
+
+ return nil
+}
+
+// startBusinessLogic initializes counts from store and binds to EventBus.
+// When re-elected, it resets and re-initializes counts; Bind is called only
once.
+func (c *managerComponent) startBusinessLogic() error {
+ c.manager.Reset()
+ // Wire up leader guard so event handler skips processing when not
leader.
+ if c.needsLeaderElection {
+ cm := c.manager.(*counterManager)
+ cm.isLeader = &c.isLeader
+ }
+ if err := c.initializeCountsFromStore(c.storeRouter); err != nil {
+ logger.Warnf("Failed to initialize counter manager from store:
%v", err)
+ }
+ if !c.bound.Load() {
+ if err := c.manager.Bind(c.bus); err != nil {
+ return err
+ }
+ c.bound.Store(true)
+ }
+ return nil
}
func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router)
error {
diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go
index 581139fa..0c8fc0c0 100644
--- a/pkg/console/counter/manager.go
+++ b/pkg/console/counter/manager.go
@@ -19,6 +19,7 @@ package counter
import (
"fmt"
+ "sync/atomic"
"k8s.io/client-go/tools/cache"
@@ -59,6 +60,7 @@ type counterManager struct {
simpleCounters map[resmodel.ResourceKind]*Counter
distributionConfigs
map[resmodel.ResourceKind][]*distributionCounterConfig
distributionByType map[CounterType]*DistributionCounter
+ isLeader *atomic.Bool
}
func NewCounterManager() CounterManager {
@@ -193,6 +195,9 @@ func (cm *counterManager) Bind(bus events.EventBus) error {
}
func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event
events.Event) error {
+ if cm.isLeader != nil && !cm.isLeader.Load() {
+ return nil
+ }
logger.Debugf("CounterManager handling %s event, type: %s", kind,
event.Type())
if counter := cm.simpleCounters[kind]; counter != nil {
processSimpleCounter(counter, event)
diff --git a/pkg/core/discovery/component.go b/pkg/core/discovery/component.go
index 6cd27203..069c451f 100644
--- a/pkg/core/discovery/component.go
+++ b/pkg/core/discovery/component.go
@@ -18,18 +18,22 @@
package discovery
import (
+ "context"
"fmt"
"math"
"reflect"
+ "sync/atomic"
"github.com/duke-git/lancet/v2/slice"
"github.com/apache/dubbo-admin/pkg/common/bizerror"
"github.com/apache/dubbo-admin/pkg/config/discovery"
"github.com/apache/dubbo-admin/pkg/config/engine"
+ storecfg "github.com/apache/dubbo-admin/pkg/config/store"
"github.com/apache/dubbo-admin/pkg/core/controller"
"github.com/apache/dubbo-admin/pkg/core/discovery/subscriber"
"github.com/apache/dubbo-admin/pkg/core/events"
+ "github.com/apache/dubbo-admin/pkg/core/leader"
"github.com/apache/dubbo-admin/pkg/core/logger"
meshresource
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
@@ -51,10 +55,13 @@ var _ Component = &discoveryComponent{}
type Informers []controller.Informer
type discoveryComponent struct {
- configs []*discovery.Config
- informers map[string]Informers
- subscribers []events.Subscriber
- subscriptionMgr events.SubscriptionManager
+ configs []*discovery.Config
+ informers map[string]Informers
+ subscribers []events.Subscriber
+ subscriptionMgr events.SubscriptionManager
+ leaderElection *leader.LeaderElection
+ needsLeaderElection bool
+ subscribed atomic.Bool
}
func (d *discoveryComponent) RequiredDependencies() []runtime.ComponentType {
@@ -111,23 +118,91 @@ func (d *discoveryComponent) Init(ctx
runtime.BuilderContext) error {
if err != nil {
return err
}
+
+ // Memory store runs single-replica; leader election is not needed.
+ if ctx.Config().Store.Type == storecfg.Memory {
+ return nil
+ }
+
+ dbSrc, ok := storeComponent.(leader.DBSource)
+ if !ok {
+ return nil
+ }
+ db, hasDB := dbSrc.GetDB()
+ if !hasDB {
+ return nil
+ }
+ holderID, err := leader.GenerateHolderID()
+ if err != nil {
+ logger.Warnf("discovery: failed to generate holder ID, skipping
leader election: %v", err)
+ return nil
+ }
+ le := leader.NewLeaderElection(db, runtime.ResourceDiscovery, holderID)
+ if err := le.EnsureTable(); err != nil {
+ logger.Warnf("discovery: failed to ensure leader lease table:
%v", err)
+ return nil
+ }
+ d.leaderElection = le
+ d.needsLeaderElection = true
+ logger.Infof("discovery: leader election initialized (holder: %s)",
holderID)
return nil
}
func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{})
error {
- // 1. subscribe resource changed events
- for _, sub := range d.subscribers {
- err := d.subscriptionMgr.Subscribe(sub)
- if err != nil {
- return bizerror.Wrap(err, bizerror.EventError,
- fmt.Sprintf("subscriber %s can not subscribe
resource changed events", sub.Name()))
+ if !d.needsLeaderElection {
+ return d.startBusinessLogic(ch)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ <-ch
+ cancel()
+ }()
+
+ var leaderStopCh chan struct{}
+
+ d.leaderElection.RunLeaderElection(ctx, ch,
+ func() { // onStartLeading: create a fresh stopCh for this
leadership term
+ leaderStopCh = make(chan struct{})
+ logger.Infof("discovery: became leader, starting
business logic")
+ if err := d.startBusinessLogic(leaderStopCh); err !=
nil {
+ logger.Errorf("discovery: failed to start
business logic: %v", err)
+ }
+ },
+ func() { // onStopLeading: stop informers from the current term
+ logger.Warnf("discovery: lost leadership, stopping
business logic")
+ if leaderStopCh != nil {
+ close(leaderStopCh)
+ leaderStopCh = nil
+ }
+ },
+ )
+
+ return nil
+}
+
+// startBusinessLogic starts subscribers and informers using the provided
stopCh.
+// When stopCh is closed all informer goroutines will exit.
+func (d *discoveryComponent) startBusinessLogic(stopCh <-chan struct{}) error {
+ // 1. subscribe resource changed events (only once for the process
lifetime)
+ if !d.subscribed.Load() {
+ for _, sub := range d.subscribers {
+ err := d.subscriptionMgr.Subscribe(sub)
+ if err != nil {
+ return bizerror.Wrap(err, bizerror.EventError,
+ fmt.Sprintf("subscriber %s can not
subscribe resource changed events", sub.Name()))
+ }
}
+ d.subscribed.Store(true)
}
+ // 2. start informers
for name, informers := range d.informers {
for _, informer := range informers {
- go informer.Run(ch)
+ go informer.Run(stopCh)
}
- logger.Infof("resource discvoery %s has started succesfully",
name)
+ logger.Infof("resource discovery %s has started successfully",
name)
}
return nil
}
diff --git a/pkg/core/engine/component.go b/pkg/core/engine/component.go
index 992eec8a..88dc293e 100644
--- a/pkg/core/engine/component.go
+++ b/pkg/core/engine/component.go
@@ -18,17 +18,21 @@
package engine
import (
+ "context"
"fmt"
"math"
"reflect"
+ "sync/atomic"
"k8s.io/client-go/tools/cache"
"github.com/apache/dubbo-admin/pkg/common/bizerror"
enginecfg "github.com/apache/dubbo-admin/pkg/config/engine"
+ storecfg "github.com/apache/dubbo-admin/pkg/config/store"
"github.com/apache/dubbo-admin/pkg/core/controller"
"github.com/apache/dubbo-admin/pkg/core/engine/subscriber"
"github.com/apache/dubbo-admin/pkg/core/events"
+ "github.com/apache/dubbo-admin/pkg/core/leader"
"github.com/apache/dubbo-admin/pkg/core/logger"
meshresource
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
"github.com/apache/dubbo-admin/pkg/core/runtime"
@@ -52,6 +56,9 @@ type engineComponent struct {
informers []controller.Informer
subscriptionManager events.SubscriptionManager
subscribers []events.Subscriber
+ leaderElection *leader.LeaderElection
+ needsLeaderElection bool
+ subscribed atomic.Bool
}
func newEngineComponent() Component {
@@ -103,7 +110,35 @@ func (e *engineComponent) Init(ctx runtime.BuilderContext)
error {
if err = e.initSubscribers(eventBus); err != nil {
return fmt.Errorf("init subscribers failed, %w", err)
}
- logger.Infof("resource engine %s has been inited successfully", e.name)
+
+ defer logger.Infof("resource engine %s has been inited successfully",
e.name)
+
+ // Memory store runs single-replica; leader election is not needed.
+ if ctx.Config().Store.Type == storecfg.Memory {
+ return nil
+ }
+
+ dbSrc, ok := storeComponent.(leader.DBSource)
+ if !ok {
+ return nil
+ }
+ db, hasDB := dbSrc.GetDB()
+ if !hasDB {
+ return nil
+ }
+ holderID, err := leader.GenerateHolderID()
+ if err != nil {
+ logger.Warnf("engine: failed to generate holder ID, skipping
leader election: %v", err)
+ return nil
+ }
+ le := leader.NewLeaderElection(db, runtime.ResourceEngine, holderID)
+ if err := le.EnsureTable(); err != nil {
+ logger.Warnf("engine: failed to ensure leader lease table: %v",
err)
+ return nil
+ }
+ e.leaderElection = le
+ e.needsLeaderElection = true
+ logger.Infof("engine: leader election initialized (holder: %s)",
holderID)
return nil
}
@@ -146,15 +181,55 @@ func (e *engineComponent) initSubscribers(eventbus
events.EventBus) error {
}
func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error {
- // 1. subscribe resource changed events
- for _, sub := range e.subscribers {
- if err := e.subscriptionManager.Subscribe(sub); err != nil {
- return fmt.Errorf("could not subscribe %s to eventbus,
%w", sub.Name(), err)
+ if !e.needsLeaderElection {
+ return e.startBusinessLogic(ch)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ <-ch
+ cancel()
+ }()
+
+ var leaderStopCh chan struct{}
+
+ e.leaderElection.RunLeaderElection(ctx, ch,
+ func() { // onStartLeading: create a fresh stopCh for this
leadership term
+ leaderStopCh = make(chan struct{})
+ logger.Infof("engine: became leader, starting business
logic")
+ if err := e.startBusinessLogic(leaderStopCh); err !=
nil {
+ logger.Errorf("engine: failed to start business
logic: %v", err)
+ }
+ },
+ func() { // onStopLeading: stop informers from the current term
+ logger.Warnf("engine: lost leadership, stopping
business logic")
+ if leaderStopCh != nil {
+ close(leaderStopCh)
+ leaderStopCh = nil
+ }
+ },
+ )
+
+ return nil
+}
+
+// startBusinessLogic starts subscribers and informers using the provided
stopCh.
+// When stopCh is closed all informer goroutines will exit.
+func (e *engineComponent) startBusinessLogic(stopCh <-chan struct{}) error {
+ // 1. subscribe resource changed events (only once for the process
lifetime)
+ if !e.subscribed.Load() {
+ for _, sub := range e.subscribers {
+ if err := e.subscriptionManager.Subscribe(sub); err !=
nil {
+ return fmt.Errorf("could not subscribe %s to
eventbus, %w", sub.Name(), err)
+ }
}
+ e.subscribed.Store(true)
}
// 2. start informers
for _, informer := range e.informers {
- go informer.Run(ch)
+ go informer.Run(stopCh)
}
logger.Infof("resource engine %s has started successfully", e.name)
return nil
diff --git a/pkg/core/leader/db_source.go b/pkg/core/leader/db_source.go
new file mode 100644
index 00000000..66d37146
--- /dev/null
+++ b/pkg/core/leader/db_source.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import "gorm.io/gorm"
+
+// DBSource is an interface for components that provide access to a database
connection
+// Used by leader election to access the shared database for leader lease
management
+type DBSource interface {
+ // GetDB returns the shared database connection and a boolean
indicating if a DB is available
+ // Returns (db, true) if the component is backed by a database, (nil,
false) otherwise
+ GetDB() (*gorm.DB, bool)
+}
diff --git a/pkg/core/leader/leader.go b/pkg/core/leader/leader.go
new file mode 100644
index 00000000..e6f83147
--- /dev/null
+++ b/pkg/core/leader/leader.go
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ "gorm.io/gorm"
+
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+const (
+ // DefaultLeaseDuration is the default duration for a leader lease
+ DefaultLeaseDuration = 30 * time.Second
+ // DefaultRenewInterval is the default interval for renewing the lease
+ DefaultRenewInterval = 10 * time.Second
+ // DefaultAcquireRetryInterval is the default retry interval for
acquiring leadership
+ DefaultAcquireRetryInterval = 5 * time.Second
+)
+
+// LeaderElection manages leader election for distributed components
+// It uses database-based optimistic locking to ensure only one replica holds
the lease at any time
+type LeaderElection struct {
+ db *gorm.DB
+ component string
+ holderID string
+ leaseDuration time.Duration
+ renewInterval time.Duration
+ acquireRetry time.Duration
+ isLeader atomic.Bool
+ currentVersion int64
+ stopCh chan struct{}
+}
+
+// Option is a functional option for configuring LeaderElection
+type Option func(*LeaderElection)
+
+// WithLeaseDuration sets the lease duration
+func WithLeaseDuration(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.leaseDuration = d
+ }
+}
+
+// WithRenewInterval sets the renewal interval
+func WithRenewInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.renewInterval = d
+ }
+}
+
+// WithAcquireRetryInterval sets the acquisition retry interval
+func WithAcquireRetryInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.acquireRetry = d
+ }
+}
+
+// NewLeaderElection creates a new LeaderElection instance
+func NewLeaderElection(db *gorm.DB, component, holderID string, opts
...Option) *LeaderElection {
+ le := &LeaderElection{
+ db: db,
+ component: component,
+ holderID: holderID,
+ leaseDuration: DefaultLeaseDuration,
+ renewInterval: DefaultRenewInterval,
+ acquireRetry: DefaultAcquireRetryInterval,
+ stopCh: make(chan struct{}),
+ }
+
+ for _, opt := range opts {
+ opt(le)
+ }
+
+ return le
+}
+
+// EnsureTable creates the leader_leases table if it doesn't exist
+// This is idempotent and can be called multiple times
+func (le *LeaderElection) EnsureTable() error {
+ return le.db.AutoMigrate(&LeaderLease{})
+}
+
+// TryAcquire attempts to acquire the leader lease from an expired holder.
+// It only competes for leases that have already expired and does NOT renew an
+// existing self-held lease — use Renew for that.
+// Returns true if the current holder successfully acquired the lease.
+func (le *LeaderElection) TryAcquire(ctx context.Context) bool {
+ now := time.Now()
+ expiresAt := now.Add(le.leaseDuration)
+
+ // Only take over an expired lease; never pre-empt an active holder.
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND expires_at < ?", le.component, now).
+ Updates(map[string]interface{}{
+ "holder_id": le.holderID,
+ "acquired_at": now,
+ "expires_at": expiresAt,
+ "version": gorm.Expr("version + 1"),
+ })
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to update lease for
component %s: %v", le.component, result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ // If the update succeeded (found a row to update)
+ if result.RowsAffected > 0 {
+ // Fetch the updated version
+ var lease LeaderLease
+ err := le.db.WithContext(ctx).
+ Where("component = ?", le.component).
+ First(&lease).Error
+ if err != nil {
+ logger.Warnf("leader election: failed to read back
updated lease for component %s: %v", le.component, err)
+ le.isLeader.Store(false)
+ return false
+ }
+ le.currentVersion = lease.Version
+ le.isLeader.Store(true)
+ return true
+ }
+
+ // No row was updated, try to insert a new record (lease doesn't exist)
+ result = le.db.WithContext(ctx).Create(&LeaderLease{
+ Component: le.component,
+ HolderID: le.holderID,
+ AcquiredAt: now,
+ ExpiresAt: expiresAt,
+ Version: 1,
+ })
+
+ if result.Error != nil {
+ // If insertion fails, it means another replica just created it
+ // This is expected in concurrent scenarios
+ logger.Debugf("leader election: failed to insert lease for
component %s (probably created by another replica): %v", le.component,
result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ le.currentVersion = 1
+ le.isLeader.Store(true)
+ return true
+}
+
+// Renew attempts to renew the current leader lease
+// Returns true if the renewal was successful
+func (le *LeaderElection) Renew(ctx context.Context) bool {
+ if !le.isLeader.Load() {
+ return false
+ }
+
+ now := time.Now()
+ expiresAt := now.Add(le.leaseDuration)
+
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND holder_id = ? AND version = ?",
le.component, le.holderID, le.currentVersion).
+ Updates(map[string]interface{}{
+ "acquired_at": now,
+ "expires_at": expiresAt,
+ "version": gorm.Expr("version + 1"),
+ })
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to renew lease for
component %s: %v", le.component, result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ if result.RowsAffected > 0 {
+ le.currentVersion++
+ return true
+ }
+
+ // Lease was lost (likely held by another replica now)
+ logger.Warnf("leader election: lost leader lease for component %s
(renewal failed, version mismatch)", le.component)
+ le.isLeader.Store(false)
+ return false
+}
+
+// Release releases the leader lease for this holder
+// This should be called when the holder voluntarily gives up leadership
+func (le *LeaderElection) Release(ctx context.Context) {
+ le.isLeader.Store(false)
+
+ expiresAt := time.Now().Add(-1 * time.Second) // Immediately expire the
lease
+
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND holder_id = ?", le.component,
le.holderID).
+ Update("expires_at", expiresAt)
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to release lease for
component %s: %v", le.component, result.Error)
+ }
+}
+
+// IsLeader returns true if this holder currently holds the leader lease
+func (le *LeaderElection) IsLeader() bool {
+ return le.isLeader.Load()
+}
+
+// RunLeaderElection runs the leader election loop
+// It blocks and runs onStartLeading/onStopLeading callbacks as leadership
changes
+// This is designed to be run in a separate goroutine
+func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan
struct{},
+ onStartLeading func(), onStopLeading func()) {
+
+ ticker := time.NewTicker(le.acquireRetry)
+ defer ticker.Stop()
+
+ renewTicker := time.NewTicker(le.renewInterval)
+ defer renewTicker.Stop()
+ renewTicker.Stop() // Don't start renewal ticker yet
+
+ isLeader := false
+
+ for {
+ select {
+ case <-ctx.Done():
+ if isLeader {
+ le.Release(context.Background())
+ onStopLeading()
+ }
+ return
+ case <-stopCh:
+ if isLeader {
+ le.Release(context.Background())
+ onStopLeading()
+ }
+ return
+ case <-ticker.C:
+ // Try to acquire leadership if not already leader
+ if !isLeader {
+ if le.TryAcquire(ctx) {
+ logger.Infof("leader election:
component %s acquired leadership (holder: %s)", le.component, le.holderID)
+ isLeader = true
+ renewTicker.Reset(le.renewInterval)
+ onStartLeading()
+ }
+ }
+ case <-renewTicker.C:
+ // Renew leadership if currently leader
+ if isLeader {
+ if !le.Renew(ctx) {
+ logger.Warnf("leader election:
component %s lost leadership (holder: %s)", le.component, le.holderID)
+ isLeader = false
+ renewTicker.Stop()
+ ticker.Reset(le.acquireRetry)
+ onStopLeading()
+ }
+ }
+ }
+ }
+}
+
+// GenerateHolderID generates a unique holder ID combining hostname and UUID
+func GenerateHolderID() (string, error) {
+ hostname, err := os.Hostname()
+ if err != nil {
+ hostname = "unknown"
+ }
+ return fmt.Sprintf("%s-%s", hostname, uuid.New().String()), nil
+}
diff --git a/pkg/core/leader/leader_test.go b/pkg/core/leader/leader_test.go
new file mode 100644
index 00000000..525a1bb2
--- /dev/null
+++ b/pkg/core/leader/leader_test.go
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gorm.io/driver/sqlite"
+ "gorm.io/gorm"
+)
+
+// setupTestDB creates an in-memory SQLite database for testing
+func setupTestDB(t *testing.T) *gorm.DB {
+ db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
+ require.NoError(t, err)
+ return db
+}
+
+func TestLeaderElection_EnsureTable(t *testing.T) {
+ db := setupTestDB(t)
+ le := NewLeaderElection(db, "test-component", "holder-1")
+
+ err := le.EnsureTable()
+ assert.NoError(t, err)
+
+ // Verify the table exists by creating another instance and checking
again
+ err = le.EnsureTable()
+ assert.NoError(t, err)
+}
+
+func TestLeaderElection_TryAcquire(t *testing.T) {
+ db := setupTestDB(t)
+ le := NewLeaderElection(db, "test-component", "holder-1")
+ err := le.EnsureTable()
+ require.NoError(t, err)
+
+ ctx := context.Background()
+
+ // First attempt should succeed (no lease exists)
+ acquired := le.TryAcquire(ctx)
+ assert.True(t, acquired)
+ assert.True(t, le.IsLeader())
+
+ // Verify the lease was created
+ var lease LeaderLease
+ result := db.Where("component = ?", "test-component").First(&lease)
+ assert.NoError(t, result.Error)
+ assert.Equal(t, "holder-1", lease.HolderID)
+}
+
+func TestLeaderElection_TryAcquire_AlreadyHeld(t *testing.T) {
+ db := setupTestDB(t)
+ le1 := NewLeaderElection(db, "test-component", "holder-1")
+ le2 := NewLeaderElection(db, "test-component", "holder-2")
+
+ err := le1.EnsureTable()
+ require.NoError(t, err)
+
+ ctx := context.Background()
+
+ // First holder acquires
+ acquired := le1.TryAcquire(ctx)
+ assert.True(t, acquired)
+
+ // Second holder tries to acquire (should fail because lease is not
expired)
+ acquired = le2.TryAcquire(ctx)
+ assert.False(t, acquired)
+ assert.False(t, le2.IsLeader())
+}
+
+func TestLeaderElection_Renew(t *testing.T) {
+ db := setupTestDB(t)
+ le := NewLeaderElection(db, "test-component", "holder-1",
+ WithLeaseDuration(1*time.Second),
+ WithRenewInterval(500*time.Millisecond))
+
+ err := le.EnsureTable()
+ require.NoError(t, err)
+
+ ctx := context.Background()
+
+ // Acquire the lease first
+ acquired := le.TryAcquire(ctx)
+ assert.True(t, acquired)
+
+ oldVersion := le.currentVersion
+
+ // Renew should succeed
+ renewed := le.Renew(ctx)
+ assert.True(t, renewed)
+ assert.Greater(t, le.currentVersion, oldVersion)
+
+ // Verify the lease was updated
+ var lease LeaderLease
+ result := db.Where("component = ?", "test-component").First(&lease)
+ assert.NoError(t, result.Error)
+ assert.Greater(t, lease.Version, int64(1))
+}
+
+func TestLeaderElection_Release(t *testing.T) {
+ db := setupTestDB(t)
+ le := NewLeaderElection(db, "test-component", "holder-1")
+ err := le.EnsureTable()
+ require.NoError(t, err)
+
+ ctx := context.Background()
+
+ // Acquire the lease
+ acquired := le.TryAcquire(ctx)
+ assert.True(t, acquired)
+
+ // Release it
+ le.Release(ctx)
+ assert.False(t, le.IsLeader())
+
+ // Verify the lease has expired
+ var lease LeaderLease
+ result := db.Where("component = ?", "test-component").First(&lease)
+ assert.NoError(t, result.Error)
+ assert.True(t, lease.ExpiresAt.Before(time.Now()))
+}
+
+func TestLeaderElection_Failover(t *testing.T) {
+ db := setupTestDB(t)
+ le1 := NewLeaderElection(db, "test-component", "holder-1",
+ WithLeaseDuration(100*time.Millisecond))
+ le2 := NewLeaderElection(db, "test-component", "holder-2",
+ WithLeaseDuration(100*time.Millisecond))
+
+ err := le1.EnsureTable()
+ require.NoError(t, err)
+
+ ctx := context.Background()
+
+ // First holder acquires
+ acquired := le1.TryAcquire(ctx)
+ assert.True(t, acquired)
+
+ // Second holder cannot acquire yet
+ acquired = le2.TryAcquire(ctx)
+ assert.False(t, acquired)
+
+ // Wait for lease to expire
+ time.Sleep(150 * time.Millisecond)
+
+ // Now second holder should be able to acquire
+ acquired = le2.TryAcquire(ctx)
+ assert.True(t, acquired)
+ assert.True(t, le2.IsLeader())
+}
+
+func TestGenerateHolderID(t *testing.T) {
+ id1, err := GenerateHolderID()
+ assert.NoError(t, err)
+ assert.NotEmpty(t, id1)
+
+ id2, err := GenerateHolderID()
+ assert.NoError(t, err)
+ assert.NotEmpty(t, id2)
+
+ // IDs should be different (due to UUID)
+ assert.NotEqual(t, id1, id2)
+}
+
+func TestLeaderElection_IsLeader(t *testing.T) {
+ db := setupTestDB(t)
+ le := NewLeaderElection(db, "test-component", "holder-1")
+ err := le.EnsureTable()
+ require.NoError(t, err)
+
+ ctx := context.Background()
+
+ // Initially not leader
+ assert.False(t, le.IsLeader())
+
+ // After acquiring, should be leader
+ le.TryAcquire(ctx)
+ assert.True(t, le.IsLeader())
+
+ // After releasing, should not be leader
+ le.Release(ctx)
+ assert.False(t, le.IsLeader())
+}
diff --git a/pkg/core/leader/model.go b/pkg/core/leader/model.go
new file mode 100644
index 00000000..a6180955
--- /dev/null
+++ b/pkg/core/leader/model.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import "time"
+
+// LeaderLease is the GORM model for the leader_leases table
+// It uses optimistic locking via the Version field to ensure atomic leader
elections
+type LeaderLease struct {
+ ID uint `gorm:"primaryKey;autoIncrement"`
+ Component string `gorm:"uniqueIndex;size:64;not null"`
+ HolderID string `gorm:"size:255;not null"`
+ AcquiredAt time.Time `gorm:"not null"`
+ ExpiresAt time.Time `gorm:"not null"`
+ Version int64 `gorm:"not null;default:0"`
+}
+
+// TableName returns the table name for LeaderLease
+func (LeaderLease) TableName() string {
+ return "leader_leases"
+}
diff --git a/pkg/core/store/component.go b/pkg/core/store/component.go
index 6eadc7f2..2cae0097 100644
--- a/pkg/core/store/component.go
+++ b/pkg/core/store/component.go
@@ -20,7 +20,11 @@ package store
import (
"fmt"
"math"
+ "reflect"
+ "gorm.io/gorm"
+
+ "github.com/apache/dubbo-admin/pkg/core/leader"
coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
"github.com/apache/dubbo-admin/pkg/core/runtime"
)
@@ -34,6 +38,12 @@ type Router interface {
ResourceKindRoute(k coremodel.ResourceKind) (ResourceStore, error)
}
+// poolProvider is an internal interface for stores that provide DB access
+// This avoids circular imports by not referencing dbcommon directly
+type poolProvider interface {
+ Pool() interface{} // Returns *ConnectionPool, but we don't type it to
avoid import
+}
+
// The Component interface is composed of both functional interfaces and
lifecycle interfaces
type Component interface {
runtime.Component
@@ -48,6 +58,9 @@ type storeComponent struct {
stores map[coremodel.ResourceKind]ManagedResourceStore
}
+// Compile-time check that storeComponent implements leader.DBSource interface
+var _ leader.DBSource = &storeComponent{}
+
func (sc *storeComponent) RequiredDependencies() []runtime.ComponentType {
return []runtime.ComponentType{
runtime.EventBus, // Store may need EventBus for event emission
@@ -109,3 +122,32 @@ func (sc *storeComponent) ResourceKindRoute(k
coremodel.ResourceKind) (ResourceS
return nil, fmt.Errorf("%s is not supported by store yet", k)
}
+
+// GetDB returns the shared DB connection if the underlying store is DB-backed
+// Implements the leader.DBSource interface
+func (sc *storeComponent) GetDB() (*gorm.DB, bool) {
+ // Try to get DB from any store that has a Pool() method (all
GormStores share the same ConnectionPool)
+ for _, store := range sc.stores {
+ pp, ok := store.(poolProvider)
+ if !ok {
+ continue
+ }
+ pool := pp.Pool()
+ if pool == nil {
+ continue
+ }
+ // Use reflection to call GetDB() on the pool to avoid
importing dbcommon
+ poolVal := reflect.ValueOf(pool)
+ getDBMethod := poolVal.MethodByName("GetDB")
+ if !getDBMethod.IsValid() {
+ continue
+ }
+ result := getDBMethod.Call(nil)
+ if len(result) > 0 {
+ if db, ok := result[0].Interface().(*gorm.DB); ok {
+ return db, true
+ }
+ }
+ }
+ return nil, false
+}
diff --git a/pkg/store/dbcommon/gorm_store.go b/pkg/store/dbcommon/gorm_store.go
index e823189b..f72b4376 100644
--- a/pkg/store/dbcommon/gorm_store.go
+++ b/pkg/store/dbcommon/gorm_store.go
@@ -582,3 +582,10 @@ func (gs *GormStore) rebuildIndices() error {
logger.Infof("Rebuilt indices for %s: loaded %d resources",
gs.kind.ToString(), len(models))
return nil
}
+
+// Pool returns the connection pool for this store
+// Used by other components (e.g., leader election) that need direct DB access
+// Returns interface{} to satisfy the poolProvider interface in pkg/core/store
+func (gs *GormStore) Pool() interface{} {
+ return gs.pool
+}