This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch release-1.9.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 66ff9e59d4c8a6b74f1da7a3bfd3095f947cce65
Author: Antonin Stefanutti <anto...@stefanutti.fr>
AuthorDate: Wed Jun 1 18:46:20 2022 +0200

    feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests
    
    (cherry picked from commit 9d7145e8ab71e8619a205c65d68797084f3d5fb4)
---
 go.mod          |  1 +
 pkg/trait/gc.go | 46 +++++++++++++++++++++++++++++-----------------
 2 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/go.mod b/go.mod
index 2c15a1092..aaf324702 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
        go.uber.org/zap v1.21.0
        golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
        golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+       golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
        gopkg.in/inf.v0 v0.9.1
        gopkg.in/yaml.v2 v2.4.0
        k8s.io/api v0.22.5
diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index b11802900..6ca3af790 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -27,6 +27,8 @@ import (
        "sync"
        "time"
 
+       "golang.org/x/time/rate"
+
        "github.com/apache/camel-k/pkg/util"
        authorization "k8s.io/api/authorization/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -44,10 +46,13 @@ import (
 )
 
 var (
-       toFileName                  = regexp.MustCompile(`[^(\w/\.)]`)
-       diskCachedDiscoveryClient   discovery.CachedDiscoveryInterface
-       memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface
-       discoveryClientLock         sync.Mutex
+       toFileName = regexp.MustCompile(`[^(\w/\.)]`)
+
+       lock                  sync.Mutex
+       rateLimiter           = rate.NewLimiter(rate.Every(time.Minute), 1)
+       collectableGVKs       = make(map[schema.GroupVersionKind]struct{})
+       memoryCachedDiscovery discovery.CachedDiscoveryInterface
+       diskCachedDiscovery   discovery.CachedDiscoveryInterface
 )
 
 type discoveryCacheType string
@@ -187,6 +192,15 @@ func (t *garbageCollectorTrait) canBeDeleted(e 
*Environment, u unstructured.Unst
 }
 
 func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) 
(map[schema.GroupVersionKind]struct{}, error) {
+       lock.Lock()
+       defer lock.Unlock()
+
+       // Rate limit to avoid Discovery and SelfSubjectRulesReview requests at 
every reconciliation.
+       if !rateLimiter.Allow() {
+               // Return the cached set of garbage collectable GVKs.
+               return collectableGVKs, nil
+       }
+
        // We rely on the discovery API to retrieve all the resources GVK,
        // that results in an unbounded set that can impact garbage collection 
latency when scaling up.
        discoveryClient, err := t.discoveryClient()
@@ -196,7 +210,7 @@ func (t *garbageCollectorTrait) getDeletableTypes(e 
*Environment) (map[schema.Gr
        resources, err := discoveryClient.ServerPreferredNamespacedResources()
        // Swallow group discovery errors, e.g., Knative serving exposes
        // an aggregated API for custom.metrics.k8s.io that requires special
-       // authentication scheme while discovering preferred resources
+       // authentication scheme while discovering preferred resources.
        if err != nil && !discovery.IsGroupDiscoveryFailedError(err) {
                return nil, err
        }
@@ -237,32 +251,30 @@ func (t *garbageCollectorTrait) getDeletableTypes(e 
*Environment) (map[schema.Gr
                        }
                }
        }
+       collectableGVKs = GVKs
 
-       return GVKs, nil
+       return collectableGVKs, nil
 }
 
 func (t *garbageCollectorTrait) discoveryClient() 
(discovery.DiscoveryInterface, error) {
-       discoveryClientLock.Lock()
-       defer discoveryClientLock.Unlock()
-
        switch *t.DiscoveryCache {
        case diskDiscoveryCache:
-               if diskCachedDiscoveryClient != nil {
-                       return diskCachedDiscoveryClient, nil
+               if diskCachedDiscovery != nil {
+                       return diskCachedDiscovery, nil
                }
                config := t.Client.GetConfig()
                httpCacheDir := filepath.Join(mustHomeDir(), ".kube", 
"http-cache")
                diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", 
"discovery", toHostDir(config.Host))
                var err error
-               diskCachedDiscoveryClient, err = 
disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 
10*time.Minute)
-               return diskCachedDiscoveryClient, err
+               diskCachedDiscovery, err = 
disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 
10*time.Minute)
+               return diskCachedDiscovery, err
 
        case memoryDiscoveryCache:
-               if memoryCachedDiscoveryClient != nil {
-                       return memoryCachedDiscoveryClient, nil
+               if memoryCachedDiscovery != nil {
+                       return memoryCachedDiscovery, nil
                }
-               memoryCachedDiscoveryClient = 
memory.NewMemCacheClient(t.Client.Discovery())
-               return memoryCachedDiscoveryClient, nil
+               memoryCachedDiscovery = 
memory.NewMemCacheClient(t.Client.Discovery())
+               return memoryCachedDiscovery, nil
 
        case disabledDiscoveryCache, "":
                return t.Client.Discovery(), nil

Reply via email to