This is an automated email from the ASF dual-hosted git repository.
alinsran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 7c926e66 feat: support retry in case of sync failure (#2534)
7c926e66 is described below
commit 7c926e66b70256c5e132863ab0b3e2ec7fb4043d
Author: AlinsRan <[email protected]>
AuthorDate: Tue Sep 2 08:59:31 2025 +0800
feat: support retry in case of sync failure (#2534)
---
internal/provider/apisix/provider.go | 41 +++++++------
internal/provider/common/retrier.go | 96 +++++++++++++++++++++++++++++++
test/e2e/crds/v2/route.go | 55 ++++++++++++++++++
test/e2e/crds/v2/status.go | 42 +++++++++++---
test/e2e/framework/manifests/ingress.yaml | 2 +-
test/e2e/scaffold/scaffold.go | 23 ++++++++
6 files changed, 233 insertions(+), 26 deletions(-)
diff --git a/internal/provider/apisix/provider.go
b/internal/provider/apisix/provider.go
index 0246b3e7..f6a8f876 100644
--- a/internal/provider/apisix/provider.go
+++ b/internal/provider/apisix/provider.go
@@ -37,11 +37,19 @@ import (
"github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
"github.com/apache/apisix-ingress-controller/internal/provider"
+ "github.com/apache/apisix-ingress-controller/internal/provider/common"
"github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
)
-const ProviderTypeAPISIX = "apisix"
+const (
+ ProviderTypeAPISIX = "apisix"
+
+ RetryBaseDelay = 1 * time.Second
+ RetryMaxDelay = 1000 * time.Second
+
+ MinSyncPeriod = 1 * time.Second
+)
type apisixProvider struct {
provider.Options
@@ -223,33 +231,32 @@ func (d *apisixProvider) Start(ctx context.Context) error
{
initalSyncDelay := d.InitSyncDelay
if initalSyncDelay > 0 {
- time.AfterFunc(initalSyncDelay, func() {
- if err := d.sync(ctx); err != nil {
- log.Error(err)
- return
- }
- })
+ time.AfterFunc(initalSyncDelay, d.syncNotify)
}
- if d.SyncPeriod < 1 {
- return nil
+ syncPeriod := d.SyncPeriod
+ if syncPeriod < MinSyncPeriod {
+ syncPeriod = MinSyncPeriod
}
- ticker := time.NewTicker(d.SyncPeriod)
+ ticker := time.NewTicker(syncPeriod)
defer ticker.Stop()
+
+ retrier :=
common.NewRetrier(common.NewExponentialBackoff(RetryBaseDelay, RetryMaxDelay))
+
for {
- synced := false
select {
case <-d.syncCh:
- synced = true
case <-ticker.C:
- synced = true
+ case <-retrier.C():
case <-ctx.Done():
+ retrier.Reset()
return nil
}
- if synced {
- if err := d.sync(ctx); err != nil {
- log.Error(err)
- }
+ if err := d.sync(ctx); err != nil {
+ log.Error(err)
+ retrier.Next()
+ } else {
+ retrier.Reset()
}
}
}
diff --git a/internal/provider/common/retrier.go
b/internal/provider/common/retrier.go
new file mode 100644
index 00000000..1277ee93
--- /dev/null
+++ b/internal/provider/common/retrier.go
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "sync"
+ "time"
+)
+
+type Backoff interface {
+ Next() time.Duration
+ Reset()
+}
+
+type ExponentialBackoff struct {
+ base, max, current time.Duration
+}
+
+func NewExponentialBackoff(base, max time.Duration) *ExponentialBackoff {
+ return &ExponentialBackoff{base: base, max: max, current: base}
+}
+
+func (b *ExponentialBackoff) Next() time.Duration {
+ delay := b.current
+ b.current *= 2
+ if b.current > b.max {
+ b.current = b.max
+ }
+ return delay
+}
+
+func (b *ExponentialBackoff) Reset() {
+ b.current = b.base
+}
+
+type Retrier struct {
+ mu sync.Mutex
+ ch chan struct{}
+ timer *time.Timer
+ backoff Backoff
+}
+
+func NewRetrier(b Backoff) *Retrier {
+ return &Retrier{
+ ch: make(chan struct{}, 1),
+ backoff: b,
+ }
+}
+
+func (r *Retrier) Reset() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if r.timer != nil {
+ r.timer.Stop()
+ r.timer = nil
+ }
+ r.backoff.Reset()
+}
+
+func (r *Retrier) Next() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if r.timer != nil {
+ r.timer.Stop()
+ r.timer = nil
+ }
+
+ delay := r.backoff.Next()
+ r.timer = time.AfterFunc(delay, func() {
+ select {
+ case r.ch <- struct{}{}:
+ default:
+ }
+ })
+}
+
+func (r *Retrier) C() <-chan struct{} {
+ return r.ch
+}
diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go
index 605a7179..ac1e47c7 100644
--- a/test/e2e/crds/v2/route.go
+++ b/test/e2e/crds/v2/route.go
@@ -1676,4 +1676,59 @@ spec:
})
})
})
+
+ Context("Exception Test", func() {
+ const apisixRouteSpec = `
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: default
+spec:
+ ingressClassName: %s
+ http:
+ - name: rule0
+ match:
+ hosts:
+ - httpbin
+ paths:
+ - /*
+ backends:
+ - serviceName: httpbin-service-e2e-test
+ servicePort: 80
+`
+ It("try again when sync failed", func() {
+ s.Deployer.ScaleDataplane(0)
+
+ err :=
s.CreateResourceFromString(fmt.Sprintf(apisixRouteSpec, s.Namespace()))
+ Expect(err).NotTo(HaveOccurred(), "creating
ApisixRoute")
+
+ By("check ApisixRoute status")
+ s.RetryAssertion(func() string {
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml", "-n", s.Namespace())
+ return output
+ }).WithTimeout(30 * time.Second).
+ Should(
+ And(
+ ContainSubstring(`status:
"False"`),
+ ContainSubstring(`reason:
SyncFailed`),
+ ),
+ )
+
+ s.Deployer.ScaleDataplane(1)
+
+ s.RetryAssertion(func() string {
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml", "-n", s.Namespace())
+ return output
+ }).WithTimeout(60 * time.Second).
+ Should(ContainSubstring(`status: "True"`))
+
+ By("check route in APISIX")
+ s.RequestAssert(&scaffold.RequestAssert{
+ Method: "GET",
+ Path: "/get",
+ Host: "httpbin",
+ Check: scaffold.WithExpectedStatus(200),
+ })
+ })
+ })
})
diff --git a/test/e2e/crds/v2/status.go b/test/e2e/crds/v2/status.go
index 06170a6d..7b7defe6 100644
--- a/test/e2e/crds/v2/status.go
+++ b/test/e2e/crds/v2/status.go
@@ -24,6 +24,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
+ corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/yaml"
@@ -41,7 +42,7 @@ var _ = Describe("Test CRD Status",
Label("apisix.apache.org", "v2", "apisixrout
Context("Test ApisixRoute Sync Status", func() {
BeforeEach(func() {
By("create GatewayProxy")
- gatewayProxy := s.GetGatewayProxyYaml()
+ gatewayProxy := s.GetGatewayProxyWithServiceYaml()
err := s.CreateResourceFromString(gatewayProxy)
Expect(err).NotTo(HaveOccurred(), "creating
GatewayProxy")
time.Sleep(5 * time.Second)
@@ -144,7 +145,8 @@ spec:
It("dataplane unavailable", func() {
By("apply ApisixRoute")
- applier.MustApplyAPIv2(types.NamespacedName{Namespace:
s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, fmt.Sprintf(ar,
s.Namespace(), s.Namespace()))
+ arYaml := fmt.Sprintf(ar, s.Namespace(), s.Namespace())
+ applier.MustApplyAPIv2(types.NamespacedName{Namespace:
s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, arYaml)
By("check route in APISIX")
s.RequestAssert(&scaffold.RequestAssert{
@@ -154,13 +156,28 @@ spec:
Check: scaffold.WithExpectedStatus(200),
})
- s.Deployer.ScaleDataplane(0)
+ By("get yaml from service")
+ serviceYaml, err := s.GetOutputFromString("svc",
framework.ProviderType, "-o", "yaml")
+ Expect(err).NotTo(HaveOccurred(), "getting service
yaml")
+ By("update service to type ExternalName with invalid
host")
+ var k8sservice corev1.Service
+ err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice)
+ Expect(err).NotTo(HaveOccurred(), "unmarshalling
service")
+ oldSpec := k8sservice.Spec
+ k8sservice.Spec = corev1.ServiceSpec{
+ Type: corev1.ServiceTypeExternalName,
+ ExternalName: "invalid.host",
+ }
+ newServiceYaml, err := yaml.Marshal(k8sservice)
+ Expect(err).NotTo(HaveOccurred(), "marshalling service")
+ err = s.CreateResourceFromString(string(newServiceYaml))
+ Expect(err).NotTo(HaveOccurred(), "creating service")
By("check ApisixRoute status")
s.RetryAssertion(func() string {
- output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml", "-n", s.Namespace())
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml")
return output
- }).WithTimeout(80 * time.Second).
+ }).WithTimeout(60 * time.Second).
Should(
And(
ContainSubstring(`status:
"False"`),
@@ -168,13 +185,22 @@ spec:
),
)
- s.Deployer.ScaleDataplane(1)
+ By("update service to original spec")
+ serviceYaml, err = s.GetOutputFromString("svc",
framework.ProviderType, "-o", "yaml")
+ Expect(err).NotTo(HaveOccurred(), "getting service
yaml")
+ err = yaml.Unmarshal([]byte(serviceYaml), &k8sservice)
+ Expect(err).NotTo(HaveOccurred(), "unmarshalling
service")
+ k8sservice.Spec = oldSpec
+ newServiceYaml, err = yaml.Marshal(k8sservice)
+ Expect(err).NotTo(HaveOccurred(), "marshalling service")
+ err = s.CreateResourceFromString(string(newServiceYaml))
+ Expect(err).NotTo(HaveOccurred(), "creating service")
By("check ApisixRoute status after scaling up")
s.RetryAssertion(func() string {
- output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml", "-n", s.Namespace())
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml")
return output
- }).WithTimeout(80 * time.Second).
+ }).WithTimeout(60 * time.Second).
Should(
And(
ContainSubstring(`status:
"True"`),
diff --git a/test/e2e/framework/manifests/ingress.yaml
b/test/e2e/framework/manifests/ingress.yaml
index 1ea6a88f..c411a93d 100644
--- a/test/e2e/framework/manifests/ingress.yaml
+++ b/test/e2e/framework/manifests/ingress.yaml
@@ -333,7 +333,7 @@ data:
# The period between two consecutive
syncs.
# The default value is 0 seconds,
which means the controller will not sync.
# If you want to enable the sync, set
it to a positive value.
- init_sync_delay: {{ .InitSyncDelay | default "1m" }}
+ init_sync_delay: {{ .InitSyncDelay | default "20m" }}
---
apiVersion: v1
kind: Service
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 5d4509c2..9d8dfff1 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -460,6 +460,25 @@ spec:
value: "%s"
`
+const gatewayProxyWithServiceYaml = `
+apiVersion: apisix.apache.org/v1alpha1
+kind: GatewayProxy
+metadata:
+ name: %s
+ namespace: %s
+spec:
+ provider:
+ type: ControlPlane
+ controlPlane:
+ service:
+ name: %s
+ port: 9180
+ auth:
+ type: AdminKey
+ adminKey:
+ value: "%s"
+`
+
const ingressClassYaml = `
apiVersion: networking.k8s.io/v1
kind: IngressClass
@@ -479,6 +498,10 @@ func (s *Scaffold) GetGatewayProxyYaml() string {
return fmt.Sprintf(gatewayProxyYaml, s.namespace, s.namespace,
s.Deployer.GetAdminEndpoint(), s.AdminKey())
}
+func (s *Scaffold) GetGatewayProxyWithServiceYaml() string {
+ return fmt.Sprintf(gatewayProxyWithServiceYaml, s.namespace,
s.namespace, s.dataplaneService.Name, s.AdminKey())
+}
+
func (s *Scaffold) GetIngressClassYaml() string {
return fmt.Sprintf(ingressClassYaml, s.namespace,
s.GetControllerName(), s.namespace, s.namespace)
}