This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 5a7e49cf585aa987549f76f9301c92b40323acba
Author: nicolaferraro <ni.ferr...@gmail.com>
AuthorDate: Wed Dec 15 11:16:46 2021 +0100

    Fix #1107: generalize server side apply code and reuse
---
 addons/keda/keda.go      |   6 +--
 pkg/client/client.go     |   1 +
 pkg/client/serverside.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++
 pkg/install/kamelets.go  |  86 +-------------------------------
 pkg/util/test/client.go  |   6 +++
 5 files changed, 136 insertions(+), 87 deletions(-)

diff --git a/addons/keda/keda.go b/addons/keda/keda.go
index 65a8bd4..f59edd9 100644
--- a/addons/keda/keda.go
+++ b/addons/keda/keda.go
@@ -117,7 +117,7 @@ func (t *kedaTrait) getScaledObject(e *trait.Environment) 
(*kedav1alpha1.ScaledO
 
 func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
        ctrlRef := t.getTopControllerReference(e)
-
+       applier := e.Client.ServerOrClientSideApplier()
        if ctrlRef.Kind == camelv1alpha1.KameletBindingKind {
                // Update the KameletBinding directly (do not add it to env 
resources, it's the integration parent)
                key := client.ObjectKey{
@@ -131,7 +131,7 @@ func (t *kedaTrait) hackControllerReplicas(e 
*trait.Environment) error {
                if klb.Spec.Replicas == nil {
                        one := int32(1)
                        klb.Spec.Replicas = &one
-                       if err := e.Client.Update(e.Ctx, &klb); err != nil {
+                       if err := applier.Apply(e.Ctx, &klb); err != nil {
                                return err
                        }
                }
@@ -139,7 +139,7 @@ func (t *kedaTrait) hackControllerReplicas(e 
*trait.Environment) error {
                if e.Integration.Spec.Replicas == nil {
                        one := int32(1)
                        e.Integration.Spec.Replicas = &one
-                       if err := e.Client.Update(e.Ctx, e.Integration); err != 
nil {
+                       if err := applier.Apply(e.Ctx, e.Integration); err != 
nil {
                                return err
                        }
                }
diff --git a/pkg/client/client.go b/pkg/client/client.go
index 3334e70..2cf73c2 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -63,6 +63,7 @@ type Client interface {
        GetScheme() *runtime.Scheme
        GetConfig() *rest.Config
        GetCurrentNamespace(kubeConfig string) (string, error)
+       ServerOrClientSideApplier() ServerOrClientSideApplier
 }
 
 // Injectable identifies objects that can receive a Client.
diff --git a/pkg/client/serverside.go b/pkg/client/serverside.go
new file mode 100644
index 0000000..6efd758
--- /dev/null
+++ b/pkg/client/serverside.go
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package client
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+       "strings"
+       "sync"
+       "sync/atomic"
+
+       "github.com/apache/camel-k/pkg/util/log"
+       "github.com/apache/camel-k/pkg/util/patch"
+       "github.com/pkg/errors"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/types"
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+type ServerOrClientSideApplier struct {
+       Client             ctrl.Client
+       hasServerSideApply atomic.Value
+       tryServerSideApply sync.Once
+}
+
+func (c *defaultClient) ServerOrClientSideApplier() ServerOrClientSideApplier {
+       return ServerOrClientSideApplier{
+               Client: c,
+       }
+}
+
+func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object 
ctrl.Object) error {
+       once := false
+       var err error
+       a.tryServerSideApply.Do(func() {
+               once = true
+               if err = a.serverSideApply(ctx, object); err != nil {
+                       if isIncompatibleServerError(err) {
+                               log.Info("Fallback to client-side apply for 
installing resources")
+                               a.hasServerSideApply.Store(false)
+                               err = nil
+                       } else {
+                               a.tryServerSideApply = sync.Once{}
+                       }
+               } else {
+                       a.hasServerSideApply.Store(true)
+               }
+       })
+       if err != nil {
+               return err
+       }
+       if v := a.hasServerSideApply.Load(); v.(bool) {
+               if !once {
+                       return a.serverSideApply(ctx, object)
+               }
+       } else {
+               return a.clientSideApply(ctx, object)
+       }
+       return nil
+}
+
+func (a *ServerOrClientSideApplier) serverSideApply(ctx context.Context, 
resource ctrl.Object) error {
+       target, err := patch.PositiveApplyPatch(resource)
+       if err != nil {
+               return err
+       }
+       return a.Client.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, 
ctrl.FieldOwner("camel-k-operator"))
+}
+
+func (a *ServerOrClientSideApplier) clientSideApply(ctx context.Context, 
resource ctrl.Object) error {
+       err := a.Client.Create(ctx, resource)
+       if err == nil {
+               return nil
+       } else if !k8serrors.IsAlreadyExists(err) {
+               return fmt.Errorf("error during create resource: %s/%s: %w", 
resource.GetNamespace(), resource.GetName(), err)
+       }
+       object := &unstructured.Unstructured{}
+       object.SetNamespace(resource.GetNamespace())
+       object.SetName(resource.GetName())
+       object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
+       err = a.Client.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
+       if err != nil {
+               return err
+       }
+       p, err := patch.PositiveMergePatch(object, resource)
+       if err != nil {
+               return err
+       } else if len(p) == 0 {
+               return nil
+       }
+       return a.Client.Patch(ctx, resource, 
ctrl.RawPatch(types.MergePatchType, p))
+}
+
+func isIncompatibleServerError(err error) bool {
+       // First simpler check for older servers (i.e. OpenShift 3.11)
+       if strings.Contains(err.Error(), "415: Unsupported Media Type") {
+               return true
+       }
+       // 415: Unsupported media type means we're talking to a server which 
doesn't
+       // support server-side apply.
+       var serr *k8serrors.StatusError
+       if errors.As(err, &serr) {
+               return serr.Status().Code == http.StatusUnsupportedMediaType
+       }
+       // Non-StatusError means the error isn't because the server is 
incompatible.
+       return false
+}
diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go
index 82a818b..fc64e25 100644
--- a/pkg/install/kamelets.go
+++ b/pkg/install/kamelets.go
@@ -19,25 +19,16 @@ package install
 
 import (
        "context"
-       "errors"
        "fmt"
        "io/fs"
-       "net/http"
        "os"
        "path"
        "path/filepath"
        "strings"
-       "sync"
-       "sync/atomic"
 
        "golang.org/x/sync/errgroup"
 
-       k8serrors "k8s.io/apimachinery/pkg/api/errors"
-       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
        "k8s.io/apimachinery/pkg/runtime"
-       "k8s.io/apimachinery/pkg/types"
-
-       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
        logf "sigs.k8s.io/controller-runtime/pkg/log"
 
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -45,7 +36,6 @@ import (
        "github.com/apache/camel-k/pkg/util"
        "github.com/apache/camel-k/pkg/util/defaults"
        "github.com/apache/camel-k/pkg/util/kubernetes"
-       "github.com/apache/camel-k/pkg/util/patch"
 )
 
 const (
@@ -55,9 +45,6 @@ const (
 
 var (
        log = logf.Log
-
-       hasServerSideApply atomic.Value
-       tryServerSideApply sync.Once
 )
 
 // KameletCatalog installs the bundled Kamelets into the specified namespace.
@@ -77,7 +64,7 @@ func KameletCatalog(ctx context.Context, c client.Client, 
namespace string) erro
        }
 
        g, gCtx := errgroup.WithContext(ctx)
-
+       applier := c.ServerOrClientSideApplier()
        err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err 
error) error {
                if err != nil {
                        return err
@@ -94,31 +81,9 @@ func KameletCatalog(ctx context.Context, c client.Client, 
namespace string) erro
                        if err != nil {
                                return err
                        }
-                       once := false
-                       tryServerSideApply.Do(func() {
-                               once = true
-                               if err = serverSideApply(gCtx, c, kamelet); err 
!= nil {
-                                       if isIncompatibleServerError(err) {
-                                               log.Info("Fallback to 
client-side apply for installing bundled Kamelets")
-                                               hasServerSideApply.Store(false)
-                                               err = nil
-                                       } else {
-                                               tryServerSideApply = sync.Once{}
-                                       }
-                               } else {
-                                       hasServerSideApply.Store(true)
-                               }
-                       })
-                       if err != nil {
+                       if err := applier.Apply(gCtx, kamelet); err != nil {
                                return err
                        }
-                       if v := hasServerSideApply.Load(); v.(bool) {
-                               if !once {
-                                       return serverSideApply(gCtx, c, kamelet)
-                               }
-                       } else {
-                               return clientSideApply(gCtx, c, kamelet)
-                       }
                        return nil
                })
                return nil
@@ -130,53 +95,6 @@ func KameletCatalog(ctx context.Context, c client.Client, 
namespace string) erro
        return g.Wait()
 }
 
-func serverSideApply(ctx context.Context, c client.Client, resource 
runtime.Object) error {
-       target, err := patch.PositiveApplyPatch(resource)
-       if err != nil {
-               return err
-       }
-       return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, 
ctrl.FieldOwner("camel-k-operator"))
-}
-
-func clientSideApply(ctx context.Context, c client.Client, resource 
ctrl.Object) error {
-       err := c.Create(ctx, resource)
-       if err == nil {
-               return nil
-       } else if !k8serrors.IsAlreadyExists(err) {
-               return fmt.Errorf("error during create resource: %s/%s: %w", 
resource.GetNamespace(), resource.GetName(), err)
-       }
-       object := &unstructured.Unstructured{}
-       object.SetNamespace(resource.GetNamespace())
-       object.SetName(resource.GetName())
-       object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
-       err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
-       if err != nil {
-               return err
-       }
-       p, err := patch.PositiveMergePatch(object, resource)
-       if err != nil {
-               return err
-       } else if len(p) == 0 {
-               return nil
-       }
-       return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
-}
-
-func isIncompatibleServerError(err error) bool {
-       // First simpler check for older servers (i.e. OpenShift 3.11)
-       if strings.Contains(err.Error(), "415: Unsupported Media Type") {
-               return true
-       }
-       // 415: Unsupported media type means we're talking to a server which 
doesn't
-       // support server-side apply.
-       var serr *k8serrors.StatusError
-       if errors.As(err, &serr) {
-               return serr.Status().Code == http.StatusUnsupportedMediaType
-       }
-       // Non-StatusError means the error isn't because the server is 
incompatible.
-       return false
-}
-
 func loadKamelet(path string, namespace string, scheme *runtime.Scheme) 
(*v1alpha1.Kamelet, error) {
        content, err := util.ReadFile(path)
        if err != nil {
diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go
index 50d32fb..b4f6db4 100644
--- a/pkg/util/test/client.go
+++ b/pkg/util/test/client.go
@@ -117,6 +117,12 @@ func (c *FakeClient) Discovery() 
discovery.DiscoveryInterface {
        }
 }
 
+func (c *FakeClient) ServerOrClientSideApplier() 
client.ServerOrClientSideApplier {
+       return client.ServerOrClientSideApplier{
+               Client: c,
+       }
+}
+
 type FakeDiscovery struct {
        discovery.DiscoveryInterface
 }

Reply via email to