This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 5a7e49cf585aa987549f76f9301c92b40323acba Author: nicolaferraro <ni.ferr...@gmail.com> AuthorDate: Wed Dec 15 11:16:46 2021 +0100 Fix #1107: generalize server side apply code and reuse --- addons/keda/keda.go | 6 +-- pkg/client/client.go | 1 + pkg/client/serverside.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++ pkg/install/kamelets.go | 86 +------------------------------- pkg/util/test/client.go | 6 +++ 5 files changed, 136 insertions(+), 87 deletions(-) diff --git a/addons/keda/keda.go b/addons/keda/keda.go index 65a8bd4..f59edd9 100644 --- a/addons/keda/keda.go +++ b/addons/keda/keda.go @@ -117,7 +117,7 @@ func (t *kedaTrait) getScaledObject(e *trait.Environment) (*kedav1alpha1.ScaledO func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error { ctrlRef := t.getTopControllerReference(e) - + applier := e.Client.ServerOrClientSideApplier() if ctrlRef.Kind == camelv1alpha1.KameletBindingKind { // Update the KameletBinding directly (do not add it to env resources, it's the integration parent) key := client.ObjectKey{ @@ -131,7 +131,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error { if klb.Spec.Replicas == nil { one := int32(1) klb.Spec.Replicas = &one - if err := e.Client.Update(e.Ctx, &klb); err != nil { + if err := applier.Apply(e.Ctx, &klb); err != nil { return err } } @@ -139,7 +139,7 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error { if e.Integration.Spec.Replicas == nil { one := int32(1) e.Integration.Spec.Replicas = &one - if err := e.Client.Update(e.Ctx, e.Integration); err != nil { + if err := applier.Apply(e.Ctx, e.Integration); err != nil { return err } } diff --git a/pkg/client/client.go b/pkg/client/client.go index 3334e70..2cf73c2 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -63,6 +63,7 @@ type Client interface { GetScheme() *runtime.Scheme GetConfig() *rest.Config GetCurrentNamespace(kubeConfig string) (string, error) + ServerOrClientSideApplier() ServerOrClientSideApplier } // Injectable identifies objects that can receive a Client. diff --git a/pkg/client/serverside.go b/pkg/client/serverside.go new file mode 100644 index 0000000..6efd758 --- /dev/null +++ b/pkg/client/serverside.go @@ -0,0 +1,124 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync" + "sync/atomic" + + "github.com/apache/camel-k/pkg/util/log" + "github.com/apache/camel-k/pkg/util/patch" + "github.com/pkg/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ServerOrClientSideApplier struct { + Client ctrl.Client + hasServerSideApply atomic.Value + tryServerSideApply sync.Once +} + +func (c *defaultClient) ServerOrClientSideApplier() ServerOrClientSideApplier { + return ServerOrClientSideApplier{ + Client: c, + } +} + +func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object ctrl.Object) error { + once := false + var err error + a.tryServerSideApply.Do(func() { + once = true + if err = a.serverSideApply(ctx, object); err != nil { + if isIncompatibleServerError(err) { + log.Info("Fallback to client-side apply for installing resources") + a.hasServerSideApply.Store(false) + err = nil + } else { + a.tryServerSideApply = sync.Once{} + } + } else { + a.hasServerSideApply.Store(true) + } + }) + if err != nil { + return err + } + if v := a.hasServerSideApply.Load(); v.(bool) { + if !once { + return a.serverSideApply(ctx, object) + } + } else { + return a.clientSideApply(ctx, object) + } + return nil +} + +func (a *ServerOrClientSideApplier) serverSideApply(ctx context.Context, resource ctrl.Object) error { + target, err := patch.PositiveApplyPatch(resource) + if err != nil { + return err + } + return a.Client.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) +} + +func (a *ServerOrClientSideApplier) clientSideApply(ctx context.Context, resource ctrl.Object) error { + err := a.Client.Create(ctx, resource) + if err == nil { + return nil + } else if !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) + } + object := &unstructured.Unstructured{} + object.SetNamespace(resource.GetNamespace()) + object.SetName(resource.GetName()) + object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind()) + err = a.Client.Get(ctx, ctrl.ObjectKeyFromObject(object), object) + if err != nil { + return err + } + p, err := patch.PositiveMergePatch(object, resource) + if err != nil { + return err + } else if len(p) == 0 { + return nil + } + return a.Client.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p)) +} + +func isIncompatibleServerError(err error) bool { + // First simpler check for older servers (i.e. OpenShift 3.11) + if strings.Contains(err.Error(), "415: Unsupported Media Type") { + return true + } + // 415: Unsupported media type means we're talking to a server which doesn't + // support server-side apply. + var serr *k8serrors.StatusError + if errors.As(err, &serr) { + return serr.Status().Code == http.StatusUnsupportedMediaType + } + // Non-StatusError means the error isn't because the server is incompatible. + return false +} diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go index 82a818b..fc64e25 100644 --- a/pkg/install/kamelets.go +++ b/pkg/install/kamelets.go @@ -19,25 +19,16 @@ package install import ( "context" - "errors" "fmt" "io/fs" - "net/http" "os" "path" "path/filepath" "strings" - "sync" - "sync/atomic" "golang.org/x/sync/errgroup" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - - ctrl "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -45,7 +36,6 @@ import ( "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/defaults" "github.com/apache/camel-k/pkg/util/kubernetes" - "github.com/apache/camel-k/pkg/util/patch" ) const ( @@ -55,9 +45,6 @@ const ( var ( log = logf.Log - - hasServerSideApply atomic.Value - tryServerSideApply sync.Once ) // KameletCatalog installs the bundled Kamelets into the specified namespace. @@ -77,7 +64,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro } g, gCtx := errgroup.WithContext(ctx) - + applier := c.ServerOrClientSideApplier() err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error { if err != nil { return err @@ -94,31 +81,9 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro if err != nil { return err } - once := false - tryServerSideApply.Do(func() { - once = true - if err = serverSideApply(gCtx, c, kamelet); err != nil { - if isIncompatibleServerError(err) { - log.Info("Fallback to client-side apply for installing bundled Kamelets") - hasServerSideApply.Store(false) - err = nil - } else { - tryServerSideApply = sync.Once{} - } - } else { - hasServerSideApply.Store(true) - } - }) - if err != nil { + if err := applier.Apply(gCtx, kamelet); err != nil { return err } - if v := hasServerSideApply.Load(); v.(bool) { - if !once { - return serverSideApply(gCtx, c, kamelet) - } - } else { - return clientSideApply(gCtx, c, kamelet) - } return nil }) return nil @@ -130,53 +95,6 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro return g.Wait() } -func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error { - target, err := patch.PositiveApplyPatch(resource) - if err != nil { - return err - } - return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) -} - -func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error { - err := c.Create(ctx, resource) - if err == nil { - return nil - } else if !k8serrors.IsAlreadyExists(err) { - return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) - } - object := &unstructured.Unstructured{} - object.SetNamespace(resource.GetNamespace()) - object.SetName(resource.GetName()) - object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind()) - err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object) - if err != nil { - return err - } - p, err := patch.PositiveMergePatch(object, resource) - if err != nil { - return err - } else if len(p) == 0 { - return nil - } - return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p)) -} - -func isIncompatibleServerError(err error) bool { - // First simpler check for older servers (i.e. OpenShift 3.11) - if strings.Contains(err.Error(), "415: Unsupported Media Type") { - return true - } - // 415: Unsupported media type means we're talking to a server which doesn't - // support server-side apply. - var serr *k8serrors.StatusError - if errors.As(err, &serr) { - return serr.Status().Code == http.StatusUnsupportedMediaType - } - // Non-StatusError means the error isn't because the server is incompatible. - return false -} - func loadKamelet(path string, namespace string, scheme *runtime.Scheme) (*v1alpha1.Kamelet, error) { content, err := util.ReadFile(path) if err != nil { diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go index 50d32fb..b4f6db4 100644 --- a/pkg/util/test/client.go +++ b/pkg/util/test/client.go @@ -117,6 +117,12 @@ func (c *FakeClient) Discovery() discovery.DiscoveryInterface { } } +func (c *FakeClient) ServerOrClientSideApplier() client.ServerOrClientSideApplier { + return client.ServerOrClientSideApplier{ + Client: c, + } +} + type FakeDiscovery struct { discovery.DiscoveryInterface }