This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b46f84  Fix #1785: propagate klb changes to integrations
3b46f84 is described below

commit 3b46f849d2ce159ce0b8a32ac6592d9784ba898d
Author: nicolaferraro <ni.ferr...@gmail.com>
AuthorDate: Mon Oct 26 15:11:41 2020 +0100

    Fix #1785: propagate klb changes to integrations
---
 e2e/knative/files/display.groovy                   |  20 ++++
 e2e/knative/kamelet_test.go                        |  60 ++++++++++++
 e2e/support/test_support.go                        |  90 ++++++++++++++++-
 pkg/cmd/reset.go                                   |  16 +--
 .../kameletbinding/{initialize.go => common.go}    | 101 ++-----------------
 pkg/controller/kameletbinding/initialize.go        | 108 +--------------------
 .../kameletbinding/kamelet_binding_controller.go   |  10 ++
 pkg/controller/kameletbinding/monitor.go           |  39 +++++++-
 8 files changed, 233 insertions(+), 211 deletions(-)

diff --git a/e2e/knative/files/display.groovy b/e2e/knative/files/display.groovy
new file mode 100644
index 0000000..43a595f
--- /dev/null
+++ b/e2e/knative/files/display.groovy
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+from('knative:channel/messages')
+    .convertBodyTo(String.class)
+    .to('log:info?showAll=false')
diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go
new file mode 100644
index 0000000..cfc90f3
--- /dev/null
+++ b/e2e/knative/kamelet_test.go
@@ -0,0 +1,60 @@
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> 
Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package knative
+
+import (
+       "testing"
+
+       . "github.com/apache/camel-k/e2e/support"
+       camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       . "github.com/onsi/gomega"
+       v1 "k8s.io/api/core/v1"
+       messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1"
+)
+
+// Test that kamelet binding can be changed and changes propagated to 
integrations
+func TestKameletChange(t *testing.T) {
+
+       WithNewTestNamespace(t, func(ns string) {
+               RegisterTestingT(t)
+
+               Expect(Kamel("install", "-n", ns).Execute()).Should(BeNil())
+               Expect(CreateTimerKamelet(ns, "timer-source")()).Should(BeNil())
+               Expect(CreateKnativeChannelv1Beta1(ns, 
"messages")()).Should(BeNil())
+               Expect(Kamel("run", "-n", ns, "files/display.groovy", 
"-w").Execute()).Should(BeNil())
+               ref := v1.ObjectReference{
+                       Kind:       "InMemoryChannel",
+                       Name:       "messages",
+                       APIVersion: messaging.SchemeGroupVersion.String(),
+               }
+               Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, 
map[string]string{"message": "message is Hello"})()).Should(BeNil())
+               Eventually(IntegrationPodPhase(ns, "timer-binding"), 
TestTimeoutMedium).Should(Equal(v1.PodRunning))
+               Eventually(IntegrationCondition(ns, "timer-binding", 
camelv1.IntegrationConditionReady), 
TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+               Eventually(IntegrationLogs(ns, "display"), 
TestTimeoutShort).Should(ContainSubstring("message is Hello"))
+
+               Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, 
map[string]string{"message": "message is Hi"})()).Should(BeNil())
+               Eventually(IntegrationPodPhase(ns, "timer-binding"), 
TestTimeoutMedium).Should(Equal(v1.PodRunning))
+               Eventually(IntegrationCondition(ns, "timer-binding", 
camelv1.IntegrationConditionReady), 
TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+               Eventually(IntegrationLogs(ns, "display"), 
TestTimeoutShort).Should(ContainSubstring("message is Hi"))
+       })
+
+}
diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go
index c070c87..e572602 100644
--- a/e2e/support/test_support.go
+++ b/e2e/support/test_support.go
@@ -23,6 +23,7 @@ package support
 
 import (
        "context"
+       "encoding/json"
        "errors"
        "fmt"
        "io"
@@ -33,9 +34,10 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
        "github.com/google/uuid"
        "github.com/onsi/gomega"
-
        "github.com/spf13/cobra"
        appsv1 "k8s.io/api/apps/v1"
        "k8s.io/api/batch/v1beta1"
@@ -891,6 +893,92 @@ func CreateKnativeChannelv1Beta1(ns string, name string) 
func() error {
 }
 
 /*
+       Kamelets
+*/
+
+func CreateTimerKamelet(ns string, name string) func() error {
+       return func() error {
+               kamelet := v1alpha1.Kamelet{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Namespace: ns,
+                               Name:      name,
+                       },
+                       Spec: v1alpha1.KameletSpec{
+                               Definition: v1alpha1.JSONSchemaProps{
+                                       Properties: 
map[string]v1alpha1.JSONSchemaProps{
+                                               "message": {
+                                                       Type: "string",
+                                               },
+                                       },
+                               },
+                               Flow: asFlow(map[string]interface{}{
+                                       "from": map[string]interface{}{
+                                               "uri": "timer:tick",
+                                               "steps": 
[]map[string]interface{}{
+                                                       {
+                                                               "set-body": 
map[string]interface{}{
+                                                                       
"constant": "{{message}}",
+                                                               },
+                                                       },
+                                                       {
+                                                               "to": 
"kamelet:sink",
+                                                       },
+                                               },
+                                       },
+                               }),
+                       },
+               }
+               return TestClient.Create(TestContext, &kamelet)
+       }
+}
+
+func BindKameletTo(ns, name, from string, to corev1.ObjectReference, 
properties map[string]string) func() error {
+       return func() error {
+               kb := v1alpha1.KameletBinding{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Namespace: ns,
+                               Name:      name,
+                       },
+                       Spec: v1alpha1.KameletBindingSpec{
+                               Source: v1alpha1.Endpoint{
+                                       Ref: &corev1.ObjectReference{
+                                               Kind:       "Kamelet",
+                                               APIVersion: 
v1alpha1.SchemeGroupVersion.String(),
+                                               Name:       from,
+                                       },
+                                       Properties: 
asEndpointProperties(properties),
+                               },
+                               Sink: v1alpha1.Endpoint{
+                                       Ref:        &to,
+                                       Properties: 
asEndpointProperties(map[string]string{}),
+                               },
+                       },
+               }
+               return kubernetes.ReplaceResource(TestContext, TestClient, &kb)
+       }
+}
+
+func asFlow(source map[string]interface{}) *v1.Flow {
+       bytes, err := json.Marshal(source)
+       if err != nil {
+               panic(err)
+       }
+       return &v1.Flow{
+               RawMessage: bytes,
+       }
+}
+
+func asEndpointProperties(props map[string]string) v1alpha1.EndpointProperties 
{
+       bytes, err := json.Marshal(props)
+       if err != nil {
+               panic(err)
+       }
+       return v1alpha1.EndpointProperties{
+               RawMessage: bytes,
+       }
+}
+
+/*
        Namespace testing functions
 */
 
diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go
index f6fa6f6..5831d85 100644
--- a/pkg/cmd/reset.go
+++ b/pkg/cmd/reset.go
@@ -61,6 +61,14 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ 
[]string) {
        }
 
        var n int
+       if !o.SkipKameletBindings {
+               if n, err = o.deleteAllKameletBindings(c); err != nil {
+                       fmt.Print(err)
+                       return
+               }
+               fmt.Printf("%d kamelet bindings deleted from namespace %s\n", 
n, o.Namespace)
+       }
+
        if !o.SkipIntegrations {
                if n, err = o.deleteAllIntegrations(c); err != nil {
                        fmt.Print(err)
@@ -77,14 +85,6 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ 
[]string) {
                fmt.Printf("%d integration kits deleted from namespace %s\n", 
n, o.Namespace)
        }
 
-       if !o.SkipKameletBindings {
-               if n, err = o.deleteAllKameletBindings(c); err != nil {
-                       fmt.Print(err)
-                       return
-               }
-               fmt.Printf("%d kamelet bindings deleted from namespace %s\n", 
n, o.Namespace)
-       }
-
        if err = o.resetIntegrationPlatform(c); err != nil {
                fmt.Println(err)
                return
diff --git a/pkg/controller/kameletbinding/initialize.go 
b/pkg/controller/kameletbinding/common.go
similarity index 51%
copy from pkg/controller/kameletbinding/initialize.go
copy to pkg/controller/kameletbinding/common.go
index 8f794d6..a50b202 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -20,41 +20,19 @@ package kameletbinding
 import (
        "context"
        "encoding/json"
-       "strings"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+       "github.com/apache/camel-k/pkg/client"
        "github.com/apache/camel-k/pkg/platform"
        "github.com/apache/camel-k/pkg/util/bindings"
        "github.com/apache/camel-k/pkg/util/knative"
-       "github.com/apache/camel-k/pkg/util/kubernetes"
-       "github.com/apache/camel-k/pkg/util/patch"
        "github.com/pkg/errors"
-       corev1 "k8s.io/api/core/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-       "k8s.io/apimachinery/pkg/types"
-       "sigs.k8s.io/controller-runtime/pkg/client"
 )
 
-// NewInitializeAction returns a action that initializes the kamelet binding 
configuration when not provided by the user
-func NewInitializeAction() Action {
-       return &initializeAction{}
-}
-
-type initializeAction struct {
-       baseAction
-}
-
-func (action *initializeAction) Name() string {
-       return "initialize"
-}
-
-func (action *initializeAction) CanHandle(kameletbinding 
*v1alpha1.KameletBinding) bool {
-       return kameletbinding.Status.Phase == v1alpha1.KameletBindingPhaseNone
-}
-
-func (action *initializeAction) Handle(ctx context.Context, kameletbinding 
*v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
+func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding 
*v1alpha1.KameletBinding) (*v1.Integration, error) {
        controller := true
        blockOwnerDeletion := true
        it := v1.Integration{
@@ -78,7 +56,7 @@ func (action *initializeAction) Handle(ctx context.Context, 
kameletbinding *v1al
                it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
        }
 
-       profile, err := action.determineProfile(ctx, kameletbinding)
+       profile, err := determineProfile(ctx, c, kameletbinding)
        if err != nil {
                return nil, err
        }
@@ -86,7 +64,7 @@ func (action *initializeAction) Handle(ctx context.Context, 
kameletbinding *v1al
 
        bindingContext := bindings.BindingContext{
                Ctx:       ctx,
-               Client:    action.client,
+               Client:    c,
                Namespace: it.Namespace,
                Profile:   profile,
        }
@@ -128,77 +106,14 @@ func (action *initializeAction) Handle(ctx 
context.Context, kameletbinding *v1al
        }
        it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
 
-       if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != 
nil {
-               return nil, errors.Wrap(err, "could not create integration for 
kamelet binding")
-       }
-
-       // propagate Kamelet icon (best effort)
-       action.propagateIcon(ctx, kameletbinding)
-
-       target := kameletbinding.DeepCopy()
-       target.Status.Phase = v1alpha1.KameletBindingPhaseCreating
-       return target, nil
-}
-
-func (action *initializeAction) propagateIcon(ctx context.Context, binding 
*v1alpha1.KameletBinding) {
-       icon, err := action.findIcon(ctx, binding)
-       if err != nil {
-               action.L.Errorf(err, "cannot find icon for kamelet binding %q", 
binding.Name)
-               return
-       }
-       if icon == "" {
-               return
-       }
-       // compute patch
-       clone := binding.DeepCopy()
-       clone.Annotations = make(map[string]string)
-       for k, v := range binding.Annotations {
-               clone.Annotations[k] = v
-       }
-       if _, ok := clone.Annotations[v1alpha1.AnnotationIcon]; !ok {
-               clone.Annotations[v1alpha1.AnnotationIcon] = icon
-       }
-       p, err := patch.PositiveMergePatch(binding, clone)
-       if err != nil {
-               action.L.Errorf(err, "cannot compute patch to update icon for 
kamelet binding %q", binding.Name)
-               return
-       }
-       if len(p) > 0 {
-               if err := action.client.Patch(ctx, clone, 
client.RawPatch(types.MergePatchType, p)); err != nil {
-                       action.L.Errorf(err, "cannot apply merge patch to 
update icon for kamelet binding %q", binding.Name)
-                       return
-               }
-       }
-}
-
-func (action *initializeAction) findIcon(ctx context.Context, binding 
*v1alpha1.KameletBinding) (string, error) {
-       var kameletRef *corev1.ObjectReference
-       if binding.Spec.Source.Ref != nil && binding.Spec.Source.Ref.Kind == 
"Kamelet" && strings.HasPrefix(binding.Spec.Source.Ref.APIVersion, 
"camel.apache.org/") {
-               kameletRef = binding.Spec.Source.Ref
-       } else if binding.Spec.Sink.Ref != nil && binding.Spec.Sink.Ref.Kind == 
"Kamelet" && strings.HasPrefix(binding.Spec.Sink.Ref.APIVersion, 
"camel.apache.org/") {
-               kameletRef = binding.Spec.Sink.Ref
-       }
-
-       if kameletRef == nil {
-               return "", nil
-       }
-
-       key := client.ObjectKey{
-               Namespace: binding.Namespace,
-               Name:      kameletRef.Name,
-       }
-       var kamelet v1alpha1.Kamelet
-       if err := action.client.Get(ctx, key, &kamelet); err != nil {
-               return "", err
-       }
-       return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
+       return &it, nil
 }
 
-func (action *initializeAction) determineProfile(ctx context.Context, binding 
*v1alpha1.KameletBinding) (v1.TraitProfile, error) {
+func determineProfile(ctx context.Context, c client.Client, binding 
*v1alpha1.KameletBinding) (v1.TraitProfile, error) {
        if binding.Spec.Integration != nil && binding.Spec.Integration.Profile 
!= "" {
                return binding.Spec.Integration.Profile, nil
        }
-       pl, err := platform.GetCurrentPlatform(ctx, action.client, 
binding.Namespace)
+       pl, err := platform.GetCurrentPlatform(ctx, c, binding.Namespace)
        if err != nil && !k8serrors.IsNotFound(err) {
                return "", errors.Wrap(err, "error while retrieving the 
integration platform")
        }
@@ -210,7 +125,7 @@ func (action *initializeAction) determineProfile(ctx 
context.Context, binding *v
                        return pl.Spec.Profile, nil
                }
        }
-       if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
+       if knative.IsEnabledInNamespace(ctx, c, binding.Namespace) {
                return v1.TraitProfileKnative, nil
        }
        if pl != nil {
diff --git a/pkg/controller/kameletbinding/initialize.go 
b/pkg/controller/kameletbinding/initialize.go
index 8f794d6..cc9c38f 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/initialize.go
@@ -19,20 +19,13 @@ package kameletbinding
 
 import (
        "context"
-       "encoding/json"
        "strings"
 
-       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
-       "github.com/apache/camel-k/pkg/platform"
-       "github.com/apache/camel-k/pkg/util/bindings"
-       "github.com/apache/camel-k/pkg/util/knative"
        "github.com/apache/camel-k/pkg/util/kubernetes"
        "github.com/apache/camel-k/pkg/util/patch"
        "github.com/pkg/errors"
        corev1 "k8s.io/api/core/v1"
-       k8serrors "k8s.io/apimachinery/pkg/api/errors"
-       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
        "sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -55,80 +48,12 @@ func (action *initializeAction) CanHandle(kameletbinding 
*v1alpha1.KameletBindin
 }
 
 func (action *initializeAction) Handle(ctx context.Context, kameletbinding 
*v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
-       controller := true
-       blockOwnerDeletion := true
-       it := v1.Integration{
-               ObjectMeta: metav1.ObjectMeta{
-                       Namespace: kameletbinding.Namespace,
-                       Name:      kameletbinding.Name,
-                       OwnerReferences: []metav1.OwnerReference{
-                               {
-                                       APIVersion:         
kameletbinding.APIVersion,
-                                       Kind:               kameletbinding.Kind,
-                                       Name:               kameletbinding.Name,
-                                       UID:                kameletbinding.UID,
-                                       Controller:         &controller,
-                                       BlockOwnerDeletion: &blockOwnerDeletion,
-                               },
-                       },
-               },
-       }
-       // start from the integration spec defined in the binding
-       if kameletbinding.Spec.Integration != nil {
-               it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
-       }
-
-       profile, err := action.determineProfile(ctx, kameletbinding)
-       if err != nil {
-               return nil, err
-       }
-       it.Spec.Profile = profile
-
-       bindingContext := bindings.BindingContext{
-               Ctx:       ctx,
-               Client:    action.client,
-               Namespace: it.Namespace,
-               Profile:   profile,
-       }
-
-       from, err := bindings.Translate(bindingContext, 
v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source)
-       if err != nil {
-               return nil, errors.Wrap(err, "could not determine source URI")
-       }
-       to, err := bindings.Translate(bindingContext, 
v1alpha1.EndpointTypeSink, kameletbinding.Spec.Sink)
-       if err != nil {
-               return nil, errors.Wrap(err, "could not determine sink URI")
-       }
-
-       if len(from.Traits) > 0 || len(to.Traits) > 0 {
-               if it.Spec.Traits == nil {
-                       it.Spec.Traits = make(map[string]v1.TraitSpec)
-               }
-               for k, v := range from.Traits {
-                       it.Spec.Traits[k] = v
-               }
-               for k, v := range to.Traits {
-                       it.Spec.Traits[k] = v
-               }
-       }
-
-       flow := map[string]interface{}{
-               "from": map[string]interface{}{
-                       "uri": from.URI,
-                       "steps": []map[string]interface{}{
-                               {
-                                       "to": to.URI,
-                               },
-                       },
-               },
-       }
-       encodedFlow, err := json.Marshal(flow)
+       it, err := createIntegrationFor(ctx, action.client, kameletbinding)
        if err != nil {
                return nil, err
        }
-       it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
 
-       if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != 
nil {
+       if err := kubernetes.ReplaceResource(ctx, action.client, it); err != 
nil {
                return nil, errors.Wrap(err, "could not create integration for 
kamelet binding")
        }
 
@@ -193,32 +118,3 @@ func (action *initializeAction) findIcon(ctx 
context.Context, binding *v1alpha1.
        }
        return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
 }
-
-func (action *initializeAction) determineProfile(ctx context.Context, binding 
*v1alpha1.KameletBinding) (v1.TraitProfile, error) {
-       if binding.Spec.Integration != nil && binding.Spec.Integration.Profile 
!= "" {
-               return binding.Spec.Integration.Profile, nil
-       }
-       pl, err := platform.GetCurrentPlatform(ctx, action.client, 
binding.Namespace)
-       if err != nil && !k8serrors.IsNotFound(err) {
-               return "", errors.Wrap(err, "error while retrieving the 
integration platform")
-       }
-       if pl != nil {
-               if pl.Status.Profile != "" {
-                       return pl.Status.Profile, nil
-               }
-               if pl.Spec.Profile != "" {
-                       return pl.Spec.Profile, nil
-               }
-       }
-       if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
-               return v1.TraitProfileKnative, nil
-       }
-       if pl != nil {
-               // Determine profile from cluster type
-               plProfile := platform.GetProfile(pl)
-               if plProfile != "" {
-                       return plProfile, nil
-               }
-       }
-       return v1.DefaultTraitProfile, nil
-}
diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go 
b/pkg/controller/kameletbinding/kamelet_binding_controller.go
index 459c69c..dbebfc3 100644
--- a/pkg/controller/kameletbinding/kamelet_binding_controller.go
+++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go
@@ -21,6 +21,7 @@ import (
        "context"
        "time"
 
+       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
        "github.com/apache/camel-k/pkg/client"
        camelevent "github.com/apache/camel-k/pkg/event"
@@ -84,6 +85,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
                return err
        }
 
+       // Watch Integration to propagate changes downstream
+       err = c.Watch(&source.Kind{Type: &v1.Integration{}}, 
&handler.EnqueueRequestForOwner{
+               OwnerType:    &v1alpha1.KameletBinding{},
+               IsController: false,
+       })
+       if err != nil {
+               return err
+       }
+
        return nil
 }
 
diff --git a/pkg/controller/kameletbinding/monitor.go 
b/pkg/controller/kameletbinding/monitor.go
index 9980dc5..9cd768a 100644
--- a/pkg/controller/kameletbinding/monitor.go
+++ b/pkg/controller/kameletbinding/monitor.go
@@ -19,12 +19,14 @@ package kameletbinding
 
 import (
        "context"
+
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
        "github.com/pkg/errors"
        corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/equality"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "sigs.k8s.io/controller-runtime/pkg/client"
-
-       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 )
 
 // NewMonitorAction returns an action that monitors the kamelet binding after 
it's fully initialized
@@ -52,10 +54,41 @@ func (action *monitorAction) Handle(ctx context.Context, 
kameletbinding *v1alpha
                Name:      kameletbinding.Name,
        }
        it := v1.Integration{}
-       if err := action.client.Get(ctx, key, &it); err != nil {
+       if err := action.client.Get(ctx, key, &it); err != nil && 
k8serrors.IsNotFound(err) {
+               target := kameletbinding.DeepCopy()
+               // Rebuild the integration
+               target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+               target.Status.SetCondition(
+                       v1alpha1.KameletBindingConditionReady,
+                       corev1.ConditionFalse,
+                       "",
+                       "",
+               )
+               return target, nil
+       } else if err != nil {
                return nil, errors.Wrapf(err, "could not load integration for 
KameletBinding %q", kameletbinding.Name)
        }
 
+       // Check if the integration needs to be changed
+       expected, err := createIntegrationFor(ctx, action.client, 
kameletbinding)
+       if err != nil {
+               return nil, err
+       }
+
+       if !equality.Semantic.DeepDerivative(expected.Spec, it.Spec) {
+               // KameletBinding has changed and needs rebuild
+               target := kameletbinding.DeepCopy()
+               // Rebuild the integration
+               target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+               target.Status.SetCondition(
+                       v1alpha1.KameletBindingConditionReady,
+                       corev1.ConditionFalse,
+                       "",
+                       "",
+               )
+               return target, nil
+       }
+
        // Map integration phases to KameletBinding phases
        target := kameletbinding.DeepCopy()
        if it.Status.Phase == v1.IntegrationPhaseRunning {

Reply via email to