Copilot commented on code in PR #1423:
URL: https://github.com/apache/dubbo-admin/pull/1423#discussion_r3001362946
##########
pkg/console/counter/component.go:
##########
@@ -61,13 +71,44 @@ func (c *managerComponent) Order() int {
return math.MaxInt - 1
}
-func (c *managerComponent) Init(runtime.BuilderContext) error {
+func (c *managerComponent) Init(ctx runtime.BuilderContext) error {
mgr := NewCounterManager()
c.manager = mgr
+
+ // Memory store runs single-replica; leader election is not needed.
+ if ctx.Config().Store.Type == storecfg.Memory {
+ return nil
+ }
+
+ storeComponent, err := ctx.GetActivatedComponent(runtime.ResourceStore)
+ if err != nil {
+ return nil
Review Comment:
The leader-election setup path swallows the error from
`GetActivatedComponent(runtime.ResourceStore)` by returning nil. Since
ResourceStore is a required dependency, failing to resolve it should likely be
returned (or at least logged) rather than silently disabling leader election
and proceeding.
```suggestion
logger.Errorf("counter: failed to get ResourceStore component:
%v", err)
return err
```
##########
pkg/core/leader/leader.go:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ "gorm.io/gorm"
+
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+const (
+ // DefaultLeaseDuration is the default duration for a leader lease
+ DefaultLeaseDuration = 30 * time.Second
+ // DefaultRenewInterval is the default interval for renewing the lease
+ DefaultRenewInterval = 10 * time.Second
+ // DefaultAcquireRetryInterval is the default retry interval for
acquiring leadership
+ DefaultAcquireRetryInterval = 5 * time.Second
+)
+
+// LeaderElection manages leader election for distributed components
+// It uses database-based optimistic locking to ensure only one replica holds
the lease at any time
+type LeaderElection struct {
+ db *gorm.DB
+ component string
+ holderID string
+ leaseDuration time.Duration
+ renewInterval time.Duration
+ acquireRetry time.Duration
+ isLeader atomic.Bool
+ currentVersion int64
+ stopCh chan struct{}
+}
+
+// Option is a functional option for configuring LeaderElection
+type Option func(*LeaderElection)
+
+// WithLeaseDuration sets the lease duration
+func WithLeaseDuration(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.leaseDuration = d
+ }
+}
+
+// WithRenewInterval sets the renewal interval
+func WithRenewInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.renewInterval = d
+ }
+}
+
+// WithAcquireRetryInterval sets the acquisition retry interval
+func WithAcquireRetryInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.acquireRetry = d
+ }
+}
+
+// NewLeaderElection creates a new LeaderElection instance
+func NewLeaderElection(db *gorm.DB, component, holderID string, opts
...Option) *LeaderElection {
+ le := &LeaderElection{
+ db: db,
+ component: component,
+ holderID: holderID,
+ leaseDuration: DefaultLeaseDuration,
+ renewInterval: DefaultRenewInterval,
+ acquireRetry: DefaultAcquireRetryInterval,
+ stopCh: make(chan struct{}),
+ }
+
+ for _, opt := range opts {
+ opt(le)
+ }
+
+ return le
+}
+
+// EnsureTable creates the leader_leases table if it doesn't exist
+// This is idempotent and can be called multiple times
+func (le *LeaderElection) EnsureTable() error {
+ return le.db.AutoMigrate(&LeaderLease{})
+}
+
+// TryAcquire attempts to acquire the leader lease from an expired holder.
+// It only competes for leases that have already expired and does NOT renew an
+// existing self-held lease — use Renew for that.
+// Returns true if the current holder successfully acquired the lease.
+func (le *LeaderElection) TryAcquire(ctx context.Context) bool {
+ now := time.Now()
+ expiresAt := now.Add(le.leaseDuration)
+
+ // Only take over an expired lease; never pre-empt an active holder.
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND expires_at < ?", le.component, now).
+ Updates(map[string]interface{}{
+ "holder_id": le.holderID,
+ "acquired_at": now,
+ "expires_at": expiresAt,
+ "version": gorm.Expr("version + 1"),
+ })
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to update lease for
component %s: %v", le.component, result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ // If the update succeeded (found a row to update)
+ if result.RowsAffected > 0 {
+ // Fetch the updated version
+ var lease LeaderLease
+ err := le.db.WithContext(ctx).
+ Where("component = ?", le.component).
+ First(&lease).Error
+ if err == nil {
+ le.currentVersion = lease.Version
+ }
Review Comment:
When taking over an expired lease (`RowsAffected > 0`), the code marks
itself leader even if reading back the lease fails. In that case
`currentVersion` may remain 0, and subsequent `Renew` will immediately fail due
to version mismatch, causing leadership flapping. Treat the read-back failure
as an acquisition failure (return false) or ensure `currentVersion` is set
deterministically after the update.
```suggestion
if err != nil {
logger.Warnf("leader election: failed to read back
updated lease for component %s: %v", le.component, err)
le.isLeader.Store(false)
return false
}
le.currentVersion = lease.Version
```
##########
pkg/core/discovery/component.go:
##########
@@ -123,11 +192,12 @@ func (d *discoveryComponent) Start(_ runtime.Runtime, ch
<-chan struct{}) error
fmt.Sprintf("subscriber %s can not subscribe
resource changed events", sub.Name()))
}
}
Review Comment:
`startBusinessLogic` re-subscribes the same subscribers on every leadership
term. Since the EventBus enforces unique subscriber names, re-election after
leadership loss will return an error here and prevent informers from being
started again. Subscribe only once, or Unsubscribe when leadership is lost, or
handle the duplicate subscription error without aborting informer startup.
##########
pkg/core/engine/component.go:
##########
@@ -154,7 +224,7 @@ func (e *engineComponent) Start(_ runtime.Runtime, ch
<-chan struct{}) error {
}
Review Comment:
`startBusinessLogic` calls `Subscribe` on every leadership term. The
EventBus rejects duplicate subscriber names per resource kind, so after a
leadership loss + re-election, this will error and return before informers are
restarted, leaving the engine non-functional. Subscribe once (and keep it for
the process lifetime), or Unsubscribe on `onStopLeading`, or treat the
duplicate-name error as non-fatal and continue starting informers.
##########
pkg/core/leader/leader.go:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ "gorm.io/gorm"
+
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+const (
+ // DefaultLeaseDuration is the default duration for a leader lease
+ DefaultLeaseDuration = 30 * time.Second
+ // DefaultRenewInterval is the default interval for renewing the lease
+ DefaultRenewInterval = 10 * time.Second
+ // DefaultAcquireRetryInterval is the default retry interval for
acquiring leadership
+ DefaultAcquireRetryInterval = 5 * time.Second
+)
+
+// LeaderElection manages leader election for distributed components
+// It uses database-based optimistic locking to ensure only one replica holds
the lease at any time
+type LeaderElection struct {
+ db *gorm.DB
+ component string
+ holderID string
+ leaseDuration time.Duration
+ renewInterval time.Duration
+ acquireRetry time.Duration
+ isLeader atomic.Bool
+ currentVersion int64
+ stopCh chan struct{}
+}
+
+// Option is a functional option for configuring LeaderElection
+type Option func(*LeaderElection)
+
+// WithLeaseDuration sets the lease duration
+func WithLeaseDuration(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.leaseDuration = d
+ }
+}
+
+// WithRenewInterval sets the renewal interval
+func WithRenewInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.renewInterval = d
+ }
+}
+
+// WithAcquireRetryInterval sets the acquisition retry interval
+func WithAcquireRetryInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.acquireRetry = d
+ }
+}
+
+// NewLeaderElection creates a new LeaderElection instance
+func NewLeaderElection(db *gorm.DB, component, holderID string, opts
...Option) *LeaderElection {
+ le := &LeaderElection{
+ db: db,
+ component: component,
+ holderID: holderID,
+ leaseDuration: DefaultLeaseDuration,
+ renewInterval: DefaultRenewInterval,
+ acquireRetry: DefaultAcquireRetryInterval,
+ stopCh: make(chan struct{}),
+ }
+
+ for _, opt := range opts {
+ opt(le)
+ }
+
+ return le
+}
+
+// EnsureTable creates the leader_leases table if it doesn't exist
+// This is idempotent and can be called multiple times
+func (le *LeaderElection) EnsureTable() error {
+ return le.db.AutoMigrate(&LeaderLease{})
+}
+
+// TryAcquire attempts to acquire the leader lease from an expired holder.
+// It only competes for leases that have already expired and does NOT renew an
+// existing self-held lease — use Renew for that.
+// Returns true if the current holder successfully acquired the lease.
+func (le *LeaderElection) TryAcquire(ctx context.Context) bool {
+ now := time.Now()
+ expiresAt := now.Add(le.leaseDuration)
+
+ // Only take over an expired lease; never pre-empt an active holder.
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND expires_at < ?", le.component, now).
+ Updates(map[string]interface{}{
+ "holder_id": le.holderID,
+ "acquired_at": now,
+ "expires_at": expiresAt,
+ "version": gorm.Expr("version + 1"),
+ })
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to update lease for
component %s: %v", le.component, result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ // If the update succeeded (found a row to update)
+ if result.RowsAffected > 0 {
+ // Fetch the updated version
+ var lease LeaderLease
+ err := le.db.WithContext(ctx).
+ Where("component = ?", le.component).
+ First(&lease).Error
+ if err == nil {
+ le.currentVersion = lease.Version
+ }
+ le.isLeader.Store(true)
+ return true
+ }
+
+ // No row was updated, try to insert a new record (lease doesn't exist)
+ result = le.db.WithContext(ctx).Create(&LeaderLease{
+ Component: le.component,
+ HolderID: le.holderID,
+ AcquiredAt: now,
+ ExpiresAt: expiresAt,
+ Version: 1,
+ })
+
+ if result.Error != nil {
+ // If insertion fails, it means another replica just created it
+ // This is expected in concurrent scenarios
+ logger.Debugf("leader election: failed to insert lease for
component %s (probably created by another replica): %v", le.component,
result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ le.currentVersion = 1
+ le.isLeader.Store(true)
+ return true
+}
+
+// Renew attempts to renew the current leader lease
+// Returns true if the renewal was successful
+func (le *LeaderElection) Renew(ctx context.Context) bool {
+ if !le.isLeader.Load() {
+ return false
+ }
+
+ now := time.Now()
+ expiresAt := now.Add(le.leaseDuration)
+
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND holder_id = ? AND version = ?",
le.component, le.holderID, le.currentVersion).
+ Updates(map[string]interface{}{
+ "acquired_at": now,
+ "expires_at": expiresAt,
+ "version": gorm.Expr("version + 1"),
+ })
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to renew lease for
component %s: %v", le.component, result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ if result.RowsAffected > 0 {
+ le.currentVersion++
+ return true
+ }
+
+ // Lease was lost (likely held by another replica now)
+ logger.Warnf("leader election: lost leader lease for component %s
(renewal failed, version mismatch)", le.component)
+ le.isLeader.Store(false)
+ return false
+}
+
+// Release releases the leader lease for this holder
+// This should be called when the holder voluntarily gives up leadership
+func (le *LeaderElection) Release(ctx context.Context) {
+ le.isLeader.Store(false)
+
+ expiresAt := time.Now().Add(-1 * time.Second) // Immediately expire the
lease
+
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND holder_id = ?", le.component,
le.holderID).
+ Update("expires_at", expiresAt)
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to release lease for
component %s: %v", le.component, result.Error)
+ }
+}
+
+// IsLeader returns true if this holder currently holds the leader lease
+func (le *LeaderElection) IsLeader() bool {
+ return le.isLeader.Load()
+}
+
+// RunLeaderElection runs the leader election loop
+// It blocks and runs onStartLeading/onStopLeading callbacks as leadership
changes
+// This is designed to be run in a separate goroutine
+func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan
struct{},
+ onStartLeading func(), onStopLeading func()) {
+
+ ticker := time.NewTicker(le.acquireRetry)
+ defer ticker.Stop()
+
+ renewTicker := time.NewTicker(le.renewInterval)
Review Comment:
`renewTicker` is never stopped on function exit (only `ticker` is deferred).
This leaves an active timer running until GC, and is easy to avoid by `defer
renewTicker.Stop()` after creating it.
```suggestion
renewTicker := time.NewTicker(le.renewInterval)
defer renewTicker.Stop()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]