This is an automated email from the ASF dual-hosted git repository.

robocanic pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9de1ee0e feat: add leader election for Discovery, Engine, 
Counter.(#1423)
9de1ee0e is described below

commit 9de1ee0e916b84be0165adb3789fa273d20a914a
Author: ThunGuo <[email protected]>
AuthorDate: Sun Mar 29 09:26:05 2026 +0800

    feat: add leader election for Discovery, Engine, Counter.(#1423)
    
    * feat: add leader election for Discovery and Engine components
    
    * fix: separate renew/acquire SQL and stop informers on demotion
    
    * feat: add leader election for Counter component
    
    * fix: resolve copilot review suggestion
    
    * fix: GormStore.Pool return value
---
 go.mod                           |  28 ++--
 go.sum                           |  27 ----
 pkg/console/counter/component.go | 105 +++++++++++++--
 pkg/console/counter/manager.go   |   5 +
 pkg/core/discovery/component.go  |  99 ++++++++++++--
 pkg/core/engine/component.go     |  87 +++++++++++-
 pkg/core/leader/db_source.go     |  28 ++++
 pkg/core/leader/leader.go        | 285 +++++++++++++++++++++++++++++++++++++++
 pkg/core/leader/leader_test.go   | 202 +++++++++++++++++++++++++++
 pkg/core/leader/model.go         |  36 +++++
 pkg/core/store/component.go      |  42 ++++++
 pkg/store/dbcommon/gorm_store.go |   7 +
 12 files changed, 878 insertions(+), 73 deletions(-)

diff --git a/go.mod b/go.mod
index 9d90eed5..cd7ae39d 100644
--- a/go.mod
+++ b/go.mod
@@ -21,12 +21,8 @@ toolchain go1.24.11
 
 require (
        dubbo.apache.org/dubbo-go/v3 v3.3.0
-       github.com/Masterminds/semver/v3 v3.2.1
-       github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
-       github.com/dubbogo/gost v1.14.0
+       github.com/dubbogo/go-zookeeper v1.0.4
        github.com/duke-git/lancet/v2 v2.3.6
-       github.com/emicklei/go-restful/v3 v3.12.2
-       github.com/envoyproxy/go-control-plane v0.13.4
        github.com/envoyproxy/go-control-plane/envoy v1.32.4
        github.com/fullstorydev/grpcurl v1.9.1
        github.com/gin-contrib/sessions v1.0.4
@@ -35,27 +31,18 @@ require (
        github.com/go-co-op/gocron v1.9.0
        github.com/go-logr/logr v1.4.2
        github.com/go-logr/zapr v1.3.0
-       github.com/goburrow/cache v0.1.4
        github.com/golang/protobuf v1.5.4
-       github.com/google/go-cmp v0.7.0
        github.com/google/uuid v1.6.0
        github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
-       github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69
        github.com/jhump/protoreflect v1.16.0
-       github.com/kelseyhightower/envconfig v1.4.0
-       github.com/mitchellh/mapstructure v1.5.0
        github.com/nacos-group/nacos-sdk-go/v2 v2.3.5
        github.com/onsi/ginkgo/v2 v2.22.1
        github.com/onsi/gomega v1.36.2
        github.com/pkg/errors v0.9.1
-       github.com/prometheus/client_golang v1.20.5
-       github.com/slok/go-http-metrics v0.11.0
        github.com/spf13/cobra v1.8.1
        github.com/stretchr/testify v1.10.0
        go.uber.org/multierr v1.11.0
        go.uber.org/zap v1.27.0
-       golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
-       golang.org/x/sys v0.38.0
        golang.org/x/text v0.31.0
        google.golang.org/grpc v1.73.0
        google.golang.org/protobuf v1.36.6
@@ -68,7 +55,6 @@ require (
        k8s.io/apimachinery v0.34.2
        k8s.io/client-go v0.34.2
        k8s.io/klog/v2 v2.130.1
-       moul.io/zapgorm2 v1.3.0
        sigs.k8s.io/controller-runtime v0.19.4
        sigs.k8s.io/yaml v1.6.0
 )
@@ -110,8 +96,8 @@ require (
 )
 
 require (
-       cel.dev/expr v0.23.0 // indirect
        filippo.io/edwards25519 v1.1.0 // indirect
+       github.com/BurntSushi/toml v1.3.2 // indirect
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/bufbuild/protocompile v0.10.0 // indirect
        github.com/bytedance/sonic v1.13.2 // indirect
@@ -120,8 +106,8 @@ require (
        github.com/cloudwego/base64x v0.1.5 // indirect
        github.com/cncf/xds/go v0.0.0-20250326154945-ae57f3c0d45f // indirect
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // 
indirect
-       github.com/dubbogo/go-zookeeper v1.0.4 // indirect
-       github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 // indirect
+       github.com/dubbogo/gost v1.14.0 // indirect
+       github.com/emicklei/go-restful/v3 v3.12.2 // indirect
        github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
        github.com/fxamacker/cbor/v2 v2.9.0 // indirect
        github.com/gabriel-vasile/mimetype v1.4.8 // indirect
@@ -138,6 +124,7 @@ require (
        github.com/goccy/go-json v0.10.5 // indirect
        github.com/gogo/protobuf v1.3.2 // indirect
        github.com/google/gnostic-models v0.7.0 // indirect
+       github.com/google/go-cmp v0.7.0 // indirect
        github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
        github.com/gorilla/context v1.1.2 // indirect
        github.com/gorilla/securecookie v1.1.2 // indirect
@@ -162,6 +149,7 @@ require (
        github.com/pelletier/go-toml/v2 v2.2.4 // indirect
        github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 
// indirect
        github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // 
indirect
+       github.com/prometheus/client_golang v1.20.5 // indirect
        github.com/prometheus/client_model v0.6.1 // indirect
        github.com/prometheus/common v0.57.0 // indirect
        github.com/prometheus/procfs v0.15.1 // indirect
@@ -175,17 +163,17 @@ require (
        go.yaml.in/yaml/v3 v3.0.4 // indirect
        golang.org/x/arch v0.16.0 // indirect
        golang.org/x/crypto v0.45.0 // indirect
+       golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
        golang.org/x/net v0.47.0 // indirect
        golang.org/x/oauth2 v0.28.0 // indirect
        golang.org/x/sync v0.18.0 // indirect
+       golang.org/x/sys v0.38.0 // indirect
        golang.org/x/term v0.37.0 // indirect
        golang.org/x/time v0.9.0 // indirect
        golang.org/x/tools v0.38.0 // indirect
-       google.golang.org/genproto/googleapis/api 
v0.0.0-20250324211829-b45e905df463 // indirect
        google.golang.org/genproto/googleapis/rpc 
v0.0.0-20250324211829-b45e905df463 // indirect
        gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
        gopkg.in/inf.v0 v0.9.1 // indirect
-       gopkg.in/yaml.v2 v2.4.0 // indirect
        gopkg.in/yaml.v3 v3.0.1 // indirect
        k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
        k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
diff --git a/go.sum b/go.sum
index 265bc3a8..d2959599 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,3 @@
-cel.dev/expr v0.23.0 h1:wUb94w6OYQS4uXraxo9U+wUAs9jT47Xvl4iPgAwM2ss=
-cel.dev/expr v0.23.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
 cloud.google.com/go v0.26.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.38.0/go.mod 
h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@@ -42,8 +40,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
 github.com/BurntSushi/toml v1.3.2 
h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
 github.com/BurntSushi/toml v1.3.2/go.mod 
h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod 
h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
-github.com/Masterminds/semver/v3 v3.2.1 
h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
-github.com/Masterminds/semver/v3 v3.2.1/go.mod 
h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
 github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/Workiva/go-datastructures v1.0.52 
h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
 github.com/Workiva/go-datastructures v1.0.52/go.mod 
h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
@@ -118,8 +114,6 @@ github.com/antihax/optional v1.0.0/go.mod 
h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod 
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod 
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod 
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
-github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 
h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
-github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod 
h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
 github.com/benbjohnson/clock v1.1.0/go.mod 
h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -196,12 +190,8 @@ github.com/envoyproxy/go-control-plane 
v0.9.1-0.20191026205805-5f8ba28d4473/go.m
 github.com/envoyproxy/go-control-plane v0.9.4/go.mod 
h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
 github.com/envoyproxy/go-control-plane 
v0.9.9-0.20201210154907-fd9021fe5dad/go.mod 
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
 github.com/envoyproxy/go-control-plane 
v0.10.2-0.20220325020618-49ff273808a1/go.mod 
h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
-github.com/envoyproxy/go-control-plane v0.13.4 
h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
-github.com/envoyproxy/go-control-plane v0.13.4/go.mod 
h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
 github.com/envoyproxy/go-control-plane/envoy v1.32.4 
h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
 github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod 
h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
-github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 
h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
-github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod 
h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod 
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
 github.com/envoyproxy/protoc-gen-validate v1.2.1 
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
 github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod 
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
@@ -263,8 +253,6 @@ github.com/go-sql-driver/mysql v1.8.1/go.mod 
h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqw
 github.com/go-stack/stack v1.8.0/go.mod 
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/go-task/slim-sprig/v3 v3.0.0 
h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
 github.com/go-task/slim-sprig/v3 v3.0.0/go.mod 
h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
-github.com/goburrow/cache v0.1.4 
h1:As4KzO3hgmzPlnaMniZU9+VmoNYseUhuELbxy9mRBfw=
-github.com/goburrow/cache v0.1.4/go.mod 
h1:cDFesZDnIlrHoNlMYqqMpCRawuXulgx+y7mXU8HZ+/c=
 github.com/goccy/go-json v0.10.5 
h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
 github.com/goccy/go-json v0.10.5/go.mod 
h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
 github.com/godbus/dbus/v5 v5.0.3/go.mod 
h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@@ -380,8 +368,6 @@ github.com/hashicorp/logutils v1.0.0/go.mod 
h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
 github.com/hashicorp/mdns v1.0.0/go.mod 
h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
 github.com/hashicorp/memberlist v0.1.3/go.mod 
h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
 github.com/hashicorp/serf v0.8.2/go.mod 
h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
-github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69 
h1:umaj0TCQ9lWUUKy2DxAhEzPbwd0jnxiw1EI2z3FiILM=
-github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69/go.mod 
h1:zdLK9ilQRSMjSeLKoZ4BqUfBT7jswTGF8zRlKEsiRXA=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod 
h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/inconshreveable/mousetrap v1.1.0 
h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -400,7 +386,6 @@ github.com/jinzhu/copier v0.3.5 
h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
 github.com/jinzhu/copier v0.3.5/go.mod 
h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
 github.com/jinzhu/inflection v1.0.0 
h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
 github.com/jinzhu/inflection v1.0.0/go.mod 
h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
-github.com/jinzhu/now v1.1.4/go.mod 
h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
 github.com/jinzhu/now v1.1.5/go.mod 
h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod 
h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
@@ -427,8 +412,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod 
h1:SYymIcj16QtmaHHD7aYtjjsJG7V
 github.com/julienschmidt/httprouter v1.3.0/go.mod 
h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
 github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod 
h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
 github.com/k0kubun/pp v3.0.1+incompatible/go.mod 
h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
-github.com/kelseyhightower/envconfig v1.4.0 
h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
-github.com/kelseyhightower/envconfig v1.4.0/go.mod 
h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
 github.com/kisielk/errcheck v1.1.0/go.mod 
h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
 github.com/kisielk/errcheck v1.2.0/go.mod 
h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/errcheck v1.5.0/go.mod 
h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -473,8 +456,6 @@ github.com/mitchellh/gox v0.4.0/go.mod 
h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
 github.com/mitchellh/iochan v1.0.0/go.mod 
h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
 github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod 
h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
 github.com/mitchellh/mapstructure v1.1.2/go.mod 
h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
-github.com/mitchellh/mapstructure v1.5.0 
h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
-github.com/mitchellh/mapstructure v1.5.0/go.mod 
h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
 github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd 
h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -568,8 +549,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod 
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
 github.com/sirupsen/logrus v1.4.2/go.mod 
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/sirupsen/logrus v1.6.0/go.mod 
h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
 github.com/sirupsen/logrus v1.7.0/go.mod 
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
-github.com/slok/go-http-metrics v0.11.0 
h1:ABJUpekCZSkQT1wQrFvS4kGbhea/w6ndFJaWJeh3zL0=
-github.com/slok/go-http-metrics v0.11.0/go.mod 
h1:ZGKeYG1ET6TEJpQx18BqAJAvxw9jBAZXCHU7bWQqqAc=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod 
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
 github.com/smartystreets/assertions v1.1.0/go.mod 
h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
 github.com/smartystreets/goconvey v1.6.4/go.mod 
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
@@ -669,7 +648,6 @@ go.uber.org/goleak v1.3.0/go.mod 
h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 go.uber.org/multierr v1.1.0/go.mod 
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
 go.uber.org/multierr v1.5.0/go.mod 
h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
 go.uber.org/multierr v1.6.0/go.mod 
h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
-go.uber.org/multierr v1.7.0/go.mod 
h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
 go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
 go.uber.org/multierr v1.11.0/go.mod 
h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod 
h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
@@ -1023,8 +1001,6 @@ google.golang.org/genproto 
v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
 google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod 
h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod 
h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod 
h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
-google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 
h1:hE3bRWtU6uceqlh4fhrSnUyjKHMKB9KrTLLG+bc0ddM=
-google.golang.org/genproto/googleapis/api 
v0.0.0-20250324211829-b45e905df463/go.mod 
h1:U90ffi8eUL9MwPcrJylN5+Mk2v3vuPDptd5yyNUiRR8=
 google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 
h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g=
 google.golang.org/genproto/googleapis/rpc 
v0.0.0-20250324211829-b45e905df463/go.mod 
h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
 google.golang.org/grpc v1.19.0/go.mod 
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@@ -1102,7 +1078,6 @@ gorm.io/driver/postgres v1.6.0 
h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4=
 gorm.io/driver/postgres v1.6.0/go.mod 
h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo=
 gorm.io/driver/sqlite v1.5.7 h1:8NvsrhP0ifM7LX9G4zPB97NwovUakUxc+2V2uuf3Z1I=
 gorm.io/driver/sqlite v1.5.7/go.mod 
h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
-gorm.io/gorm v1.23.6/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
 gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs=
 gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
@@ -1124,8 +1099,6 @@ k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b 
h1:MloQ9/bdJyIu9lb1PzujOP
 k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b/go.mod 
h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts=
 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 
h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y=
 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod 
h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
-moul.io/zapgorm2 v1.3.0 h1:+CzUTMIcnafd0d/BvBce8T4uPn6DQnpIrz64cyixlkk=
-moul.io/zapgorm2 v1.3.0/go.mod h1:nPVy6U9goFKHR4s+zfSo1xVFaoU7Qgd5DoCdOfzoCqs=
 nullprogram.com/x/optparse v1.0.0/go.mod 
h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
 rsc.io/binaryregexp v0.2.0/go.mod 
h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go
index 8c39cb3a..710518c1 100644
--- a/pkg/console/counter/component.go
+++ b/pkg/console/counter/component.go
@@ -18,10 +18,14 @@
 package counter
 
 import (
+       "context"
        "fmt"
        "math"
+       "sync/atomic"
 
+       storecfg "github.com/apache/dubbo-admin/pkg/config/store"
        "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/leader"
        "github.com/apache/dubbo-admin/pkg/core/logger"
        meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
        resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
@@ -50,7 +54,13 @@ func (c *managerComponent) RequiredDependencies() 
[]runtime.ComponentType {
 }
 
 type managerComponent struct {
-       manager CounterManager
+       manager             CounterManager
+       leaderElection      *leader.LeaderElection
+       needsLeaderElection bool
+       isLeader            atomic.Bool
+       bound               atomic.Bool
+       storeRouter         store.Router
+       bus                 events.EventBus
 }
 
 func (c *managerComponent) Type() runtime.ComponentType {
@@ -61,13 +71,45 @@ func (c *managerComponent) Order() int {
        return math.MaxInt - 1
 }
 
-func (c *managerComponent) Init(runtime.BuilderContext) error {
+func (c *managerComponent) Init(ctx runtime.BuilderContext) error {
        mgr := NewCounterManager()
        c.manager = mgr
+
+       // Memory store runs single-replica; leader election is not needed.
+       if ctx.Config().Store.Type == storecfg.Memory {
+               return nil
+       }
+
+       storeComponent, err := ctx.GetActivatedComponent(runtime.ResourceStore)
+       if err != nil {
+               logger.Warnf("counter: failed to get ResourceStore component, 
skipping leader election: %v", err)
+               return nil
+       }
+       dbSrc, ok := storeComponent.(leader.DBSource)
+       if !ok {
+               return nil
+       }
+       db, hasDB := dbSrc.GetDB()
+       if !hasDB {
+               return nil
+       }
+       holderID, err := leader.GenerateHolderID()
+       if err != nil {
+               logger.Warnf("counter: failed to generate holder ID, skipping 
leader election: %v", err)
+               return nil
+       }
+       le := leader.NewLeaderElection(db, string(ComponentType), holderID)
+       if err := le.EnsureTable(); err != nil {
+               logger.Warnf("counter: failed to ensure leader lease table: 
%v", err)
+               return nil
+       }
+       c.leaderElection = le
+       c.needsLeaderElection = true
+       logger.Infof("counter: leader election initialized (holder: %s)", 
holderID)
        return nil
 }
 
-func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
+func (c *managerComponent) Start(rt runtime.Runtime, ch <-chan struct{}) error 
{
        storeComponent, err := rt.GetComponent(runtime.ResourceStore)
        if err != nil {
                return err
@@ -76,10 +118,7 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ 
<-chan struct{}) error {
        if !ok {
                return fmt.Errorf("component %s does not implement 
store.Router", runtime.ResourceStore)
        }
-
-       if err := c.initializeCountsFromStore(storeRouter); err != nil {
-               logger.Warnf("Failed to initialize counter manager from store: 
%v", err)
-       }
+       c.storeRouter = storeRouter
 
        component, err := rt.GetComponent(runtime.EventBus)
        if err != nil {
@@ -89,7 +128,57 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ 
<-chan struct{}) error {
        if !ok {
                return fmt.Errorf("component %s does not implement 
events.EventBus", runtime.EventBus)
        }
-       return c.manager.Bind(bus)
+       c.bus = bus
+
+       if !c.needsLeaderElection {
+               return c.startBusinessLogic()
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       go func() {
+               <-ch
+               cancel()
+       }()
+
+       c.leaderElection.RunLeaderElection(ctx, ch,
+               func() { // onStartLeading
+                       logger.Infof("counter: became leader, starting business 
logic")
+                       c.isLeader.Store(true)
+                       if err := c.startBusinessLogic(); err != nil {
+                               logger.Errorf("counter: failed to start 
business logic: %v", err)
+                       }
+               },
+               func() { // onStopLeading
+                       logger.Warnf("counter: lost leadership, resetting 
counters")
+                       c.isLeader.Store(false)
+                       c.manager.Reset()
+               },
+       )
+
+       return nil
+}
+
+// startBusinessLogic initializes counts from store and binds to EventBus.
+// When re-elected, it resets and re-initializes counts; Bind is called only 
once.
+func (c *managerComponent) startBusinessLogic() error {
+       c.manager.Reset()
+       // Wire up leader guard so event handler skips processing when not 
leader.
+       if c.needsLeaderElection {
+               cm := c.manager.(*counterManager)
+               cm.isLeader = &c.isLeader
+       }
+       if err := c.initializeCountsFromStore(c.storeRouter); err != nil {
+               logger.Warnf("Failed to initialize counter manager from store: 
%v", err)
+       }
+       if !c.bound.Load() {
+               if err := c.manager.Bind(c.bus); err != nil {
+                       return err
+               }
+               c.bound.Store(true)
+       }
+       return nil
 }
 
 func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router) 
error {
diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go
index 581139fa..0c8fc0c0 100644
--- a/pkg/console/counter/manager.go
+++ b/pkg/console/counter/manager.go
@@ -19,6 +19,7 @@ package counter
 
 import (
        "fmt"
+       "sync/atomic"
 
        "k8s.io/client-go/tools/cache"
 
@@ -59,6 +60,7 @@ type counterManager struct {
        simpleCounters      map[resmodel.ResourceKind]*Counter
        distributionConfigs 
map[resmodel.ResourceKind][]*distributionCounterConfig
        distributionByType  map[CounterType]*DistributionCounter
+       isLeader            *atomic.Bool
 }
 
 func NewCounterManager() CounterManager {
@@ -193,6 +195,9 @@ func (cm *counterManager) Bind(bus events.EventBus) error {
 }
 
 func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event 
events.Event) error {
+       if cm.isLeader != nil && !cm.isLeader.Load() {
+               return nil
+       }
        logger.Debugf("CounterManager handling %s event, type: %s", kind, 
event.Type())
        if counter := cm.simpleCounters[kind]; counter != nil {
                processSimpleCounter(counter, event)
diff --git a/pkg/core/discovery/component.go b/pkg/core/discovery/component.go
index 6cd27203..069c451f 100644
--- a/pkg/core/discovery/component.go
+++ b/pkg/core/discovery/component.go
@@ -18,18 +18,22 @@
 package discovery
 
 import (
+       "context"
        "fmt"
        "math"
        "reflect"
+       "sync/atomic"
 
        "github.com/duke-git/lancet/v2/slice"
 
        "github.com/apache/dubbo-admin/pkg/common/bizerror"
        "github.com/apache/dubbo-admin/pkg/config/discovery"
        "github.com/apache/dubbo-admin/pkg/config/engine"
+       storecfg "github.com/apache/dubbo-admin/pkg/config/store"
        "github.com/apache/dubbo-admin/pkg/core/controller"
        "github.com/apache/dubbo-admin/pkg/core/discovery/subscriber"
        "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/leader"
        "github.com/apache/dubbo-admin/pkg/core/logger"
        meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
        coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
@@ -51,10 +55,13 @@ var _ Component = &discoveryComponent{}
 type Informers []controller.Informer
 
 type discoveryComponent struct {
-       configs         []*discovery.Config
-       informers       map[string]Informers
-       subscribers     []events.Subscriber
-       subscriptionMgr events.SubscriptionManager
+       configs             []*discovery.Config
+       informers           map[string]Informers
+       subscribers         []events.Subscriber
+       subscriptionMgr     events.SubscriptionManager
+       leaderElection      *leader.LeaderElection
+       needsLeaderElection bool
+       subscribed          atomic.Bool
 }
 
 func (d *discoveryComponent) RequiredDependencies() []runtime.ComponentType {
@@ -111,23 +118,91 @@ func (d *discoveryComponent) Init(ctx 
runtime.BuilderContext) error {
        if err != nil {
                return err
        }
+
+       // Memory store runs single-replica; leader election is not needed.
+       if ctx.Config().Store.Type == storecfg.Memory {
+               return nil
+       }
+
+       dbSrc, ok := storeComponent.(leader.DBSource)
+       if !ok {
+               return nil
+       }
+       db, hasDB := dbSrc.GetDB()
+       if !hasDB {
+               return nil
+       }
+       holderID, err := leader.GenerateHolderID()
+       if err != nil {
+               logger.Warnf("discovery: failed to generate holder ID, skipping 
leader election: %v", err)
+               return nil
+       }
+       le := leader.NewLeaderElection(db, runtime.ResourceDiscovery, holderID)
+       if err := le.EnsureTable(); err != nil {
+               logger.Warnf("discovery: failed to ensure leader lease table: 
%v", err)
+               return nil
+       }
+       d.leaderElection = le
+       d.needsLeaderElection = true
+       logger.Infof("discovery: leader election initialized (holder: %s)", 
holderID)
        return nil
 }
 
 func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{}) 
error {
-       // 1. subscribe resource changed events
-       for _, sub := range d.subscribers {
-               err := d.subscriptionMgr.Subscribe(sub)
-               if err != nil {
-                       return bizerror.Wrap(err, bizerror.EventError,
-                               fmt.Sprintf("subscriber %s can not subscribe 
resource changed events", sub.Name()))
+       if !d.needsLeaderElection {
+               return d.startBusinessLogic(ch)
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       go func() {
+               <-ch
+               cancel()
+       }()
+
+       var leaderStopCh chan struct{}
+
+       d.leaderElection.RunLeaderElection(ctx, ch,
+               func() { // onStartLeading: create a fresh stopCh for this 
leadership term
+                       leaderStopCh = make(chan struct{})
+                       logger.Infof("discovery: became leader, starting 
business logic")
+                       if err := d.startBusinessLogic(leaderStopCh); err != 
nil {
+                               logger.Errorf("discovery: failed to start 
business logic: %v", err)
+                       }
+               },
+               func() { // onStopLeading: stop informers from the current term
+                       logger.Warnf("discovery: lost leadership, stopping 
business logic")
+                       if leaderStopCh != nil {
+                               close(leaderStopCh)
+                               leaderStopCh = nil
+                       }
+               },
+       )
+
+       return nil
+}
+
+// startBusinessLogic starts subscribers and informers using the provided 
stopCh.
+// When stopCh is closed all informer goroutines will exit.
+func (d *discoveryComponent) startBusinessLogic(stopCh <-chan struct{}) error {
+       // 1. subscribe resource changed events (only once for the process 
lifetime)
+       if !d.subscribed.Load() {
+               for _, sub := range d.subscribers {
+                       err := d.subscriptionMgr.Subscribe(sub)
+                       if err != nil {
+                               return bizerror.Wrap(err, bizerror.EventError,
+                                       fmt.Sprintf("subscriber %s can not 
subscribe resource changed events", sub.Name()))
+                       }
                }
+               d.subscribed.Store(true)
        }
+       // 2. start informers
        for name, informers := range d.informers {
                for _, informer := range informers {
-                       go informer.Run(ch)
+                       go informer.Run(stopCh)
                }
-               logger.Infof("resource discvoery %s has started succesfully", 
name)
+               logger.Infof("resource discovery %s has started successfully", 
name)
        }
        return nil
 }
diff --git a/pkg/core/engine/component.go b/pkg/core/engine/component.go
index 992eec8a..88dc293e 100644
--- a/pkg/core/engine/component.go
+++ b/pkg/core/engine/component.go
@@ -18,17 +18,21 @@
 package engine
 
 import (
+       "context"
        "fmt"
        "math"
        "reflect"
+       "sync/atomic"
 
        "k8s.io/client-go/tools/cache"
 
        "github.com/apache/dubbo-admin/pkg/common/bizerror"
        enginecfg "github.com/apache/dubbo-admin/pkg/config/engine"
+       storecfg "github.com/apache/dubbo-admin/pkg/config/store"
        "github.com/apache/dubbo-admin/pkg/core/controller"
        "github.com/apache/dubbo-admin/pkg/core/engine/subscriber"
        "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/leader"
        "github.com/apache/dubbo-admin/pkg/core/logger"
        meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
        "github.com/apache/dubbo-admin/pkg/core/runtime"
@@ -52,6 +56,9 @@ type engineComponent struct {
        informers           []controller.Informer
        subscriptionManager events.SubscriptionManager
        subscribers         []events.Subscriber
+       leaderElection      *leader.LeaderElection
+       needsLeaderElection bool
+       subscribed          atomic.Bool
 }
 
 func newEngineComponent() Component {
@@ -103,7 +110,35 @@ func (e *engineComponent) Init(ctx runtime.BuilderContext) 
error {
        if err = e.initSubscribers(eventBus); err != nil {
                return fmt.Errorf("init subscribers failed, %w", err)
        }
-       logger.Infof("resource engine %s has been inited successfully", e.name)
+
+       defer logger.Infof("resource engine %s has been inited successfully", 
e.name)
+
+       // Memory store runs single-replica; leader election is not needed.
+       if ctx.Config().Store.Type == storecfg.Memory {
+               return nil
+       }
+
+       dbSrc, ok := storeComponent.(leader.DBSource)
+       if !ok {
+               return nil
+       }
+       db, hasDB := dbSrc.GetDB()
+       if !hasDB {
+               return nil
+       }
+       holderID, err := leader.GenerateHolderID()
+       if err != nil {
+               logger.Warnf("engine: failed to generate holder ID, skipping 
leader election: %v", err)
+               return nil
+       }
+       le := leader.NewLeaderElection(db, runtime.ResourceEngine, holderID)
+       if err := le.EnsureTable(); err != nil {
+               logger.Warnf("engine: failed to ensure leader lease table: %v", 
err)
+               return nil
+       }
+       e.leaderElection = le
+       e.needsLeaderElection = true
+       logger.Infof("engine: leader election initialized (holder: %s)", 
holderID)
        return nil
 }
 
@@ -146,15 +181,55 @@ func (e *engineComponent) initSubscribers(eventbus 
events.EventBus) error {
 }
 
 func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error {
-       // 1. subscribe resource changed events
-       for _, sub := range e.subscribers {
-               if err := e.subscriptionManager.Subscribe(sub); err != nil {
-                       return fmt.Errorf("could not subscribe %s to eventbus, 
%w", sub.Name(), err)
+       if !e.needsLeaderElection {
+               return e.startBusinessLogic(ch)
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       go func() {
+               <-ch
+               cancel()
+       }()
+
+       var leaderStopCh chan struct{}
+
+       e.leaderElection.RunLeaderElection(ctx, ch,
+               func() { // onStartLeading: create a fresh stopCh for this 
leadership term
+                       leaderStopCh = make(chan struct{})
+                       logger.Infof("engine: became leader, starting business 
logic")
+                       if err := e.startBusinessLogic(leaderStopCh); err != 
nil {
+                               logger.Errorf("engine: failed to start business 
logic: %v", err)
+                       }
+               },
+               func() { // onStopLeading: stop informers from the current term
+                       logger.Warnf("engine: lost leadership, stopping 
business logic")
+                       if leaderStopCh != nil {
+                               close(leaderStopCh)
+                               leaderStopCh = nil
+                       }
+               },
+       )
+
+       return nil
+}
+
+// startBusinessLogic starts subscribers and informers using the provided 
stopCh.
+// When stopCh is closed all informer goroutines will exit.
+func (e *engineComponent) startBusinessLogic(stopCh <-chan struct{}) error {
+       // 1. subscribe resource changed events (only once for the process 
lifetime)
+       if !e.subscribed.Load() {
+               for _, sub := range e.subscribers {
+                       if err := e.subscriptionManager.Subscribe(sub); err != 
nil {
+                               return fmt.Errorf("could not subscribe %s to 
eventbus, %w", sub.Name(), err)
+                       }
                }
+               e.subscribed.Store(true)
        }
        // 2. start informers
        for _, informer := range e.informers {
-               go informer.Run(ch)
+               go informer.Run(stopCh)
        }
        logger.Infof("resource engine %s has started successfully", e.name)
        return nil
diff --git a/pkg/core/leader/db_source.go b/pkg/core/leader/db_source.go
new file mode 100644
index 00000000..66d37146
--- /dev/null
+++ b/pkg/core/leader/db_source.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import "gorm.io/gorm"
+
+// DBSource is an interface for components that provide access to a database 
connection
+// Used by leader election to access the shared database for leader lease 
management
+type DBSource interface {
+       // GetDB returns the shared database connection and a boolean 
indicating if a DB is available
+       // Returns (db, true) if the component is backed by a database, (nil, 
false) otherwise
+       GetDB() (*gorm.DB, bool)
+}
diff --git a/pkg/core/leader/leader.go b/pkg/core/leader/leader.go
new file mode 100644
index 00000000..e6f83147
--- /dev/null
+++ b/pkg/core/leader/leader.go
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "sync/atomic"
+       "time"
+
+       "github.com/google/uuid"
+       "gorm.io/gorm"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+const (
+       // DefaultLeaseDuration is the default duration for a leader lease
+       DefaultLeaseDuration = 30 * time.Second
+       // DefaultRenewInterval is the default interval for renewing the lease
+       DefaultRenewInterval = 10 * time.Second
+       // DefaultAcquireRetryInterval is the default retry interval for 
acquiring leadership
+       DefaultAcquireRetryInterval = 5 * time.Second
+)
+
+// LeaderElection manages leader election for distributed components
+// It uses database-based optimistic locking to ensure only one replica holds 
the lease at any time
+type LeaderElection struct {
+       db             *gorm.DB
+       component      string
+       holderID       string
+       leaseDuration  time.Duration
+       renewInterval  time.Duration
+       acquireRetry   time.Duration
+       isLeader       atomic.Bool
+       currentVersion int64
+       stopCh         chan struct{}
+}
+
+// Option is a functional option for configuring LeaderElection
+type Option func(*LeaderElection)
+
+// WithLeaseDuration sets the lease duration
+func WithLeaseDuration(d time.Duration) Option {
+       return func(le *LeaderElection) {
+               le.leaseDuration = d
+       }
+}
+
+// WithRenewInterval sets the renewal interval
+func WithRenewInterval(d time.Duration) Option {
+       return func(le *LeaderElection) {
+               le.renewInterval = d
+       }
+}
+
+// WithAcquireRetryInterval sets the acquisition retry interval
+func WithAcquireRetryInterval(d time.Duration) Option {
+       return func(le *LeaderElection) {
+               le.acquireRetry = d
+       }
+}
+
+// NewLeaderElection creates a new LeaderElection instance
+func NewLeaderElection(db *gorm.DB, component, holderID string, opts 
...Option) *LeaderElection {
+       le := &LeaderElection{
+               db:            db,
+               component:     component,
+               holderID:      holderID,
+               leaseDuration: DefaultLeaseDuration,
+               renewInterval: DefaultRenewInterval,
+               acquireRetry:  DefaultAcquireRetryInterval,
+               stopCh:        make(chan struct{}),
+       }
+
+       for _, opt := range opts {
+               opt(le)
+       }
+
+       return le
+}
+
+// EnsureTable creates the leader_leases table if it doesn't exist
+// This is idempotent and can be called multiple times
+func (le *LeaderElection) EnsureTable() error {
+       return le.db.AutoMigrate(&LeaderLease{})
+}
+
+// TryAcquire attempts to acquire the leader lease from an expired holder.
+// It only competes for leases that have already expired and does NOT renew an
+// existing self-held lease — use Renew for that.
+// Returns true if the current holder successfully acquired the lease.
+func (le *LeaderElection) TryAcquire(ctx context.Context) bool {
+       now := time.Now()
+       expiresAt := now.Add(le.leaseDuration)
+
+       // Only take over an expired lease; never pre-empt an active holder.
+       result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+               Where("component = ? AND expires_at < ?", le.component, now).
+               Updates(map[string]interface{}{
+                       "holder_id":   le.holderID,
+                       "acquired_at": now,
+                       "expires_at":  expiresAt,
+                       "version":     gorm.Expr("version + 1"),
+               })
+
+       if result.Error != nil {
+               logger.Warnf("leader election: failed to update lease for 
component %s: %v", le.component, result.Error)
+               le.isLeader.Store(false)
+               return false
+       }
+
+       // If the update succeeded (found a row to update)
+       if result.RowsAffected > 0 {
+               // Fetch the updated version
+               var lease LeaderLease
+               err := le.db.WithContext(ctx).
+                       Where("component = ?", le.component).
+                       First(&lease).Error
+               if err != nil {
+                       logger.Warnf("leader election: failed to read back 
updated lease for component %s: %v", le.component, err)
+                       le.isLeader.Store(false)
+                       return false
+               }
+               le.currentVersion = lease.Version
+               le.isLeader.Store(true)
+               return true
+       }
+
+       // No row was updated, try to insert a new record (lease doesn't exist)
+       result = le.db.WithContext(ctx).Create(&LeaderLease{
+               Component:  le.component,
+               HolderID:   le.holderID,
+               AcquiredAt: now,
+               ExpiresAt:  expiresAt,
+               Version:    1,
+       })
+
+       if result.Error != nil {
+               // If insertion fails, it means another replica just created it
+               // This is expected in concurrent scenarios
+               logger.Debugf("leader election: failed to insert lease for 
component %s (probably created by another replica): %v", le.component, 
result.Error)
+               le.isLeader.Store(false)
+               return false
+       }
+
+       le.currentVersion = 1
+       le.isLeader.Store(true)
+       return true
+}
+
+// Renew attempts to renew the current leader lease
+// Returns true if the renewal was successful
+func (le *LeaderElection) Renew(ctx context.Context) bool {
+       if !le.isLeader.Load() {
+               return false
+       }
+
+       now := time.Now()
+       expiresAt := now.Add(le.leaseDuration)
+
+       result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+               Where("component = ? AND holder_id = ? AND version = ?", 
le.component, le.holderID, le.currentVersion).
+               Updates(map[string]interface{}{
+                       "acquired_at": now,
+                       "expires_at":  expiresAt,
+                       "version":     gorm.Expr("version + 1"),
+               })
+
+       if result.Error != nil {
+               logger.Warnf("leader election: failed to renew lease for 
component %s: %v", le.component, result.Error)
+               le.isLeader.Store(false)
+               return false
+       }
+
+       if result.RowsAffected > 0 {
+               le.currentVersion++
+               return true
+       }
+
+       // Lease was lost (likely held by another replica now)
+       logger.Warnf("leader election: lost leader lease for component %s 
(renewal failed, version mismatch)", le.component)
+       le.isLeader.Store(false)
+       return false
+}
+
+// Release releases the leader lease for this holder
+// This should be called when the holder voluntarily gives up leadership
+func (le *LeaderElection) Release(ctx context.Context) {
+       le.isLeader.Store(false)
+
+       expiresAt := time.Now().Add(-1 * time.Second) // Immediately expire the 
lease
+
+       result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+               Where("component = ? AND holder_id = ?", le.component, 
le.holderID).
+               Update("expires_at", expiresAt)
+
+       if result.Error != nil {
+               logger.Warnf("leader election: failed to release lease for 
component %s: %v", le.component, result.Error)
+       }
+}
+
+// IsLeader returns true if this holder currently holds the leader lease
+func (le *LeaderElection) IsLeader() bool {
+       return le.isLeader.Load()
+}
+
+// RunLeaderElection runs the leader election loop
+// It blocks and runs onStartLeading/onStopLeading callbacks as leadership 
changes
+// This is designed to be run in a separate goroutine
+func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan 
struct{},
+       onStartLeading func(), onStopLeading func()) {
+
+       ticker := time.NewTicker(le.acquireRetry)
+       defer ticker.Stop()
+
+       renewTicker := time.NewTicker(le.renewInterval)
+       defer renewTicker.Stop()
+       renewTicker.Stop() // Don't start renewal ticker yet
+
+       isLeader := false
+
+       for {
+               select {
+               case <-ctx.Done():
+                       if isLeader {
+                               le.Release(context.Background())
+                               onStopLeading()
+                       }
+                       return
+               case <-stopCh:
+                       if isLeader {
+                               le.Release(context.Background())
+                               onStopLeading()
+                       }
+                       return
+               case <-ticker.C:
+                       // Try to acquire leadership if not already leader
+                       if !isLeader {
+                               if le.TryAcquire(ctx) {
+                                       logger.Infof("leader election: 
component %s acquired leadership (holder: %s)", le.component, le.holderID)
+                                       isLeader = true
+                                       renewTicker.Reset(le.renewInterval)
+                                       onStartLeading()
+                               }
+                       }
+               case <-renewTicker.C:
+                       // Renew leadership if currently leader
+                       if isLeader {
+                               if !le.Renew(ctx) {
+                                       logger.Warnf("leader election: 
component %s lost leadership (holder: %s)", le.component, le.holderID)
+                                       isLeader = false
+                                       renewTicker.Stop()
+                                       ticker.Reset(le.acquireRetry)
+                                       onStopLeading()
+                               }
+                       }
+               }
+       }
+}
+
+// GenerateHolderID generates a unique holder ID combining hostname and UUID
+func GenerateHolderID() (string, error) {
+       hostname, err := os.Hostname()
+       if err != nil {
+               hostname = "unknown"
+       }
+       return fmt.Sprintf("%s-%s", hostname, uuid.New().String()), nil
+}
diff --git a/pkg/core/leader/leader_test.go b/pkg/core/leader/leader_test.go
new file mode 100644
index 00000000..525a1bb2
--- /dev/null
+++ b/pkg/core/leader/leader_test.go
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "gorm.io/driver/sqlite"
+       "gorm.io/gorm"
+)
+
+// setupTestDB creates an in-memory SQLite database for testing
+func setupTestDB(t *testing.T) *gorm.DB {
+       db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
+       require.NoError(t, err)
+       return db
+}
+
+func TestLeaderElection_EnsureTable(t *testing.T) {
+       db := setupTestDB(t)
+       le := NewLeaderElection(db, "test-component", "holder-1")
+
+       err := le.EnsureTable()
+       assert.NoError(t, err)
+
+       // Verify the table exists by creating another instance and checking 
again
+       err = le.EnsureTable()
+       assert.NoError(t, err)
+}
+
+func TestLeaderElection_TryAcquire(t *testing.T) {
+       db := setupTestDB(t)
+       le := NewLeaderElection(db, "test-component", "holder-1")
+       err := le.EnsureTable()
+       require.NoError(t, err)
+
+       ctx := context.Background()
+
+       // First attempt should succeed (no lease exists)
+       acquired := le.TryAcquire(ctx)
+       assert.True(t, acquired)
+       assert.True(t, le.IsLeader())
+
+       // Verify the lease was created
+       var lease LeaderLease
+       result := db.Where("component = ?", "test-component").First(&lease)
+       assert.NoError(t, result.Error)
+       assert.Equal(t, "holder-1", lease.HolderID)
+}
+
+func TestLeaderElection_TryAcquire_AlreadyHeld(t *testing.T) {
+       db := setupTestDB(t)
+       le1 := NewLeaderElection(db, "test-component", "holder-1")
+       le2 := NewLeaderElection(db, "test-component", "holder-2")
+
+       err := le1.EnsureTable()
+       require.NoError(t, err)
+
+       ctx := context.Background()
+
+       // First holder acquires
+       acquired := le1.TryAcquire(ctx)
+       assert.True(t, acquired)
+
+       // Second holder tries to acquire (should fail because lease is not 
expired)
+       acquired = le2.TryAcquire(ctx)
+       assert.False(t, acquired)
+       assert.False(t, le2.IsLeader())
+}
+
+func TestLeaderElection_Renew(t *testing.T) {
+       db := setupTestDB(t)
+       le := NewLeaderElection(db, "test-component", "holder-1",
+               WithLeaseDuration(1*time.Second),
+               WithRenewInterval(500*time.Millisecond))
+
+       err := le.EnsureTable()
+       require.NoError(t, err)
+
+       ctx := context.Background()
+
+       // Acquire the lease first
+       acquired := le.TryAcquire(ctx)
+       assert.True(t, acquired)
+
+       oldVersion := le.currentVersion
+
+       // Renew should succeed
+       renewed := le.Renew(ctx)
+       assert.True(t, renewed)
+       assert.Greater(t, le.currentVersion, oldVersion)
+
+       // Verify the lease was updated
+       var lease LeaderLease
+       result := db.Where("component = ?", "test-component").First(&lease)
+       assert.NoError(t, result.Error)
+       assert.Greater(t, lease.Version, int64(1))
+}
+
+func TestLeaderElection_Release(t *testing.T) {
+       db := setupTestDB(t)
+       le := NewLeaderElection(db, "test-component", "holder-1")
+       err := le.EnsureTable()
+       require.NoError(t, err)
+
+       ctx := context.Background()
+
+       // Acquire the lease
+       acquired := le.TryAcquire(ctx)
+       assert.True(t, acquired)
+
+       // Release it
+       le.Release(ctx)
+       assert.False(t, le.IsLeader())
+
+       // Verify the lease has expired
+       var lease LeaderLease
+       result := db.Where("component = ?", "test-component").First(&lease)
+       assert.NoError(t, result.Error)
+       assert.True(t, lease.ExpiresAt.Before(time.Now()))
+}
+
+func TestLeaderElection_Failover(t *testing.T) {
+       db := setupTestDB(t)
+       le1 := NewLeaderElection(db, "test-component", "holder-1",
+               WithLeaseDuration(100*time.Millisecond))
+       le2 := NewLeaderElection(db, "test-component", "holder-2",
+               WithLeaseDuration(100*time.Millisecond))
+
+       err := le1.EnsureTable()
+       require.NoError(t, err)
+
+       ctx := context.Background()
+
+       // First holder acquires
+       acquired := le1.TryAcquire(ctx)
+       assert.True(t, acquired)
+
+       // Second holder cannot acquire yet
+       acquired = le2.TryAcquire(ctx)
+       assert.False(t, acquired)
+
+       // Wait for lease to expire
+       time.Sleep(150 * time.Millisecond)
+
+       // Now second holder should be able to acquire
+       acquired = le2.TryAcquire(ctx)
+       assert.True(t, acquired)
+       assert.True(t, le2.IsLeader())
+}
+
+func TestGenerateHolderID(t *testing.T) {
+       id1, err := GenerateHolderID()
+       assert.NoError(t, err)
+       assert.NotEmpty(t, id1)
+
+       id2, err := GenerateHolderID()
+       assert.NoError(t, err)
+       assert.NotEmpty(t, id2)
+
+       // IDs should be different (due to UUID)
+       assert.NotEqual(t, id1, id2)
+}
+
+func TestLeaderElection_IsLeader(t *testing.T) {
+       db := setupTestDB(t)
+       le := NewLeaderElection(db, "test-component", "holder-1")
+       err := le.EnsureTable()
+       require.NoError(t, err)
+
+       ctx := context.Background()
+
+       // Initially not leader
+       assert.False(t, le.IsLeader())
+
+       // After acquiring, should be leader
+       le.TryAcquire(ctx)
+       assert.True(t, le.IsLeader())
+
+       // After releasing, should not be leader
+       le.Release(ctx)
+       assert.False(t, le.IsLeader())
+}
diff --git a/pkg/core/leader/model.go b/pkg/core/leader/model.go
new file mode 100644
index 00000000..a6180955
--- /dev/null
+++ b/pkg/core/leader/model.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import "time"
+
+// LeaderLease is the GORM model for the leader_leases table
+// It uses optimistic locking via the Version field to ensure atomic leader 
elections
+type LeaderLease struct {
+       ID         uint      `gorm:"primaryKey;autoIncrement"`
+       Component  string    `gorm:"uniqueIndex;size:64;not null"`
+       HolderID   string    `gorm:"size:255;not null"`
+       AcquiredAt time.Time `gorm:"not null"`
+       ExpiresAt  time.Time `gorm:"not null"`
+       Version    int64     `gorm:"not null;default:0"`
+}
+
+// TableName returns the table name for LeaderLease
+func (LeaderLease) TableName() string {
+       return "leader_leases"
+}
diff --git a/pkg/core/store/component.go b/pkg/core/store/component.go
index 6eadc7f2..2cae0097 100644
--- a/pkg/core/store/component.go
+++ b/pkg/core/store/component.go
@@ -20,7 +20,11 @@ package store
 import (
        "fmt"
        "math"
+       "reflect"
 
+       "gorm.io/gorm"
+
+       "github.com/apache/dubbo-admin/pkg/core/leader"
        coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
        "github.com/apache/dubbo-admin/pkg/core/runtime"
 )
@@ -34,6 +38,12 @@ type Router interface {
        ResourceKindRoute(k coremodel.ResourceKind) (ResourceStore, error)
 }
 
+// poolProvider is an internal interface for stores that provide DB access
+// This avoids circular imports by not referencing dbcommon directly
+type poolProvider interface {
+       Pool() interface{} // Returns *ConnectionPool, but we don't type it to 
avoid import
+}
+
 // The Component interface is composed of both functional interfaces and 
lifecycle interfaces
 type Component interface {
        runtime.Component
@@ -48,6 +58,9 @@ type storeComponent struct {
        stores map[coremodel.ResourceKind]ManagedResourceStore
 }
 
+// Compile-time check that storeComponent implements leader.DBSource interface
+var _ leader.DBSource = &storeComponent{}
+
 func (sc *storeComponent) RequiredDependencies() []runtime.ComponentType {
        return []runtime.ComponentType{
                runtime.EventBus, // Store may need EventBus for event emission
@@ -109,3 +122,32 @@ func (sc *storeComponent) ResourceKindRoute(k 
coremodel.ResourceKind) (ResourceS
        return nil, fmt.Errorf("%s is not supported by store yet", k)
 
 }
+
+// GetDB returns the shared DB connection if the underlying store is DB-backed
+// Implements the leader.DBSource interface
+func (sc *storeComponent) GetDB() (*gorm.DB, bool) {
+       // Try to get DB from any store that has a Pool() method (all 
GormStores share the same ConnectionPool)
+       for _, store := range sc.stores {
+               pp, ok := store.(poolProvider)
+               if !ok {
+                       continue
+               }
+               pool := pp.Pool()
+               if pool == nil {
+                       continue
+               }
+               // Use reflection to call GetDB() on the pool to avoid 
importing dbcommon
+               poolVal := reflect.ValueOf(pool)
+               getDBMethod := poolVal.MethodByName("GetDB")
+               if !getDBMethod.IsValid() {
+                       continue
+               }
+               result := getDBMethod.Call(nil)
+               if len(result) > 0 {
+                       if db, ok := result[0].Interface().(*gorm.DB); ok {
+                               return db, true
+                       }
+               }
+       }
+       return nil, false
+}
diff --git a/pkg/store/dbcommon/gorm_store.go b/pkg/store/dbcommon/gorm_store.go
index e823189b..f72b4376 100644
--- a/pkg/store/dbcommon/gorm_store.go
+++ b/pkg/store/dbcommon/gorm_store.go
@@ -582,3 +582,10 @@ func (gs *GormStore) rebuildIndices() error {
        logger.Infof("Rebuilt indices for %s: loaded %d resources", 
gs.kind.ToString(), len(models))
        return nil
 }
+
+// Pool returns the connection pool for this store
+// Used by other components (e.g., leader election) that need direct DB access
+// Returns interface{} to satisfy the poolProvider interface in pkg/core/store
+func (gs *GormStore) Pool() interface{} {
+       return gs.pool
+}

Reply via email to