This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 3d7715612a7437ba5ec6b30651dabe7540ea10bd
Author: Pasquale Congiusti <pasquale.congiu...@gmail.com>
AuthorDate: Fri Dec 1 15:48:32 2023 +0100

    chore: synthetic Integration separate controller
---
 config/rbac/namespaced/operator-role-knative.yaml  |   8 -
 config/rbac/namespaced/operator-role.yaml          |   1 +
 helm/camel-k/templates/operator-role.yaml          |   1 +
 pkg/cmd/operator/operator.go                       |   3 +
 .../integration/integration_controller.go          |  48 +---
 .../integration/integration_controller_import.go   | 249 -----------------
 pkg/controller/integration/monitor.go              |   1 -
 pkg/controller/integration/monitor_synthetic.go    |  18 --
 pkg/controller/integration/predicate.go            |  37 ---
 pkg/controller/pipe/pipe_controller.go             |   2 +-
 pkg/controller/synthetic/synthetic.go              | 300 +++++++++++++++++++++
 11 files changed, 312 insertions(+), 356 deletions(-)

diff --git a/config/rbac/namespaced/operator-role-knative.yaml 
b/config/rbac/namespaced/operator-role-knative.yaml
index 7e1d2f349..3cba80931 100644
--- a/config/rbac/namespaced/operator-role-knative.yaml
+++ b/config/rbac/namespaced/operator-role-knative.yaml
@@ -35,14 +35,6 @@ rules:
   - patch
   - update
   - watch
-- apiGroups:
-  - serving.knative.dev
-  resources:
-  - revisions
-  verbs:
-  - get
-  - list
-  - watch
 - apiGroups:
   - eventing.knative.dev
   resources:
diff --git a/config/rbac/namespaced/operator-role.yaml 
b/config/rbac/namespaced/operator-role.yaml
index 4ddc2d4c1..0f364463e 100644
--- a/config/rbac/namespaced/operator-role.yaml
+++ b/config/rbac/namespaced/operator-role.yaml
@@ -45,6 +45,7 @@ rules:
   - camel.apache.org
   resources:
   - builds
+  - integrations
   verbs:
   - delete
 - apiGroups:
diff --git a/helm/camel-k/templates/operator-role.yaml 
b/helm/camel-k/templates/operator-role.yaml
index b8e709b80..40ef9742a 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -54,6 +54,7 @@ rules:
   - camel.apache.org
   resources:
   - builds
+  - integrations
   verbs:
   - delete
 - apiGroups:
diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index 04b5ea8b2..12edd7cc1 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -59,6 +59,7 @@ import (
        v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
        "github.com/apache/camel-k/v2/pkg/client"
        "github.com/apache/camel-k/v2/pkg/controller"
+       "github.com/apache/camel-k/v2/pkg/controller/synthetic"
        "github.com/apache/camel-k/v2/pkg/event"
        "github.com/apache/camel-k/v2/pkg/install"
        "github.com/apache/camel-k/v2/pkg/platform"
@@ -231,6 +232,8 @@ func Run(healthPort, monitoringPort int32, leaderElection 
bool, leaderElectionID
        install.OperatorStartupOptionalTools(installCtx, bootstrapClient, 
watchNamespace, operatorNamespace, log)
        exitOnError(findOrCreateIntegrationPlatform(installCtx, 
bootstrapClient, operatorNamespace), "failed to create integration platform")
 
+       log.Info("Starting the synthetic Integration manager")
+       exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, 
mgr.GetCache(), mgr.GetAPIReader()), "synthetic Integration manager error")
        log.Info("Starting the manager")
        exitOnError(mgr.Start(ctx), "manager exited non-zero")
 }
diff --git a/pkg/controller/integration/integration_controller.go 
b/pkg/controller/integration/integration_controller.go
index 1979b9d4a..a16aa6967 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -328,7 +328,7 @@ func add(ctx context.Context, mgr manager.Manager, c 
client.Client, r reconcile.
        watchIntegrationResources(c, b)
        // Watch for the CronJob conditionally
        if ok, err := kubernetes.IsAPIResourceInstalled(c, 
batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); 
ok && err == nil {
-               watchCronJobResources(c, b)
+               watchCronJobResources(b)
        }
        // Watch for the Knative Services conditionally
        if ok, err := kubernetes.IsAPIResourceInstalled(c, 
servingv1.SchemeGroupVersion.String(), 
reflect.TypeOf(servingv1.Service{}).Name()); err != nil {
@@ -405,37 +405,13 @@ func watchIntegrationResources(c client.Client, b 
*builder.Builder) {
                                        },
                                }
                        })).
-               // Watch for non managed Deployments (ie, imported)
-               Watches(&appsv1.Deployment{},
-                       handler.EnqueueRequestsFromMapFunc(func(ctx 
context.Context, a ctrl.Object) []reconcile.Request {
-                               deploy, ok := a.(*appsv1.Deployment)
-                               if !ok {
-                                       log.Error(fmt.Errorf("type assertion 
failed: %v", a), "Failed to retrieve to retrieve Deployment")
-                                       return []reconcile.Request{}
-                               }
-                               return 
nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, 
&NonManagedCamelDeployment{deploy: deploy})
-                       }),
-                       builder.WithPredicates(NonManagedObjectPredicate{}),
-               ).
                // Watch for the owned Deployments
                Owns(&appsv1.Deployment{}, 
builder.WithPredicates(StatusChangedPredicate{}))
 }
 
-func watchCronJobResources(c client.Client, b *builder.Builder) {
-       // Watch for non managed Deployments (ie, imported)
-       b.Watches(&batchv1.CronJob{},
-               handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a 
ctrl.Object) []reconcile.Request {
-                       cron, ok := a.(*batchv1.CronJob)
-                       if !ok {
-                               log.Error(fmt.Errorf("type assertion failed: 
%v", a), "Failed to retrieve to retrieve CronJob")
-                               return []reconcile.Request{}
-                       }
-                       return 
nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, 
&NonManagedCamelCronjob{cron: cron})
-               }),
-               builder.WithPredicates(NonManagedObjectPredicate{}),
-       ).
-               // Watch for the owned CronJobs
-               Owns(&batchv1.CronJob{}, 
builder.WithPredicates(StatusChangedPredicate{}))
+func watchCronJobResources(b *builder.Builder) {
+       // Watch for the owned CronJobs
+       b.Owns(&batchv1.CronJob{}, 
builder.WithPredicates(StatusChangedPredicate{}))
 }
 
 func watchKnativeResources(ctx context.Context, c client.Client, b 
*builder.Builder) error {
@@ -445,20 +421,8 @@ func watchKnativeResources(ctx context.Context, c 
client.Client, b *builder.Buil
        if ok, err := kubernetes.CheckPermission(checkCtx, c, 
serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", 
"watch"); err != nil {
                return err
        } else if ok {
-               // Watch for non managed Knative Service (ie, imported)
-               b.Watches(&servingv1.Service{},
-                       handler.EnqueueRequestsFromMapFunc(func(ctx 
context.Context, a ctrl.Object) []reconcile.Request {
-                               ksvc, ok := a.(*servingv1.Service)
-                               if !ok {
-                                       log.Error(fmt.Errorf("type assertion 
failed: %v", a), "Failed to retrieve to retrieve KnativeService")
-                                       return []reconcile.Request{}
-                               }
-                               return 
nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, 
&NonManagedCamelKnativeService{ksvc: ksvc})
-                       }),
-                       builder.WithPredicates(NonManagedObjectPredicate{}),
-               ).
-                       // Watch for the owned CronJobs
-                       Owns(&servingv1.Service{}, 
builder.WithPredicates(StatusChangedPredicate{}))
+               // Watch for the owned Knative Services
+               b.Owns(&servingv1.Service{}, 
builder.WithPredicates(StatusChangedPredicate{}))
        }
        return nil
 }
diff --git a/pkg/controller/integration/integration_controller_import.go 
b/pkg/controller/integration/integration_controller_import.go
deleted file mode 100644
index 403185509..000000000
--- a/pkg/controller/integration/integration_controller_import.go
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package integration
-
-import (
-       "context"
-
-       v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
-       "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
-       "github.com/apache/camel-k/v2/pkg/client"
-       "github.com/apache/camel-k/v2/pkg/util/log"
-       "github.com/apache/camel-k/v2/pkg/util/patch"
-       appsv1 "k8s.io/api/apps/v1"
-       batchv1 "k8s.io/api/batch/v1"
-       k8serrors "k8s.io/apimachinery/pkg/api/errors"
-       "k8s.io/apimachinery/pkg/types"
-       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
-       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
-       "sigs.k8s.io/controller-runtime/pkg/reconcile"
-)
-
-// nonManagedCamelAppEnqueueRequestsFromMapFunc represent the function to 
discover the Integration which has to be woke up: it creates a synthetic
-// Integration if the Integration does not exist. This is used to import 
external Camel applications.
-func nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx context.Context, c 
client.Client, adp NonManagedCamelApplicationAdapter) []reconcile.Request {
-       if adp.GetIntegrationName() == "" {
-               return []reconcile.Request{}
-       }
-       it := v1.NewIntegration(adp.GetIntegrationNameSpace(), 
adp.GetIntegrationName())
-       err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it)
-       if err != nil {
-               if k8serrors.IsNotFound(err) {
-                       // We must perform this check to make sure the resource 
is not being deleted.
-                       // In such case it makes no sense to create an 
Integration after it.
-                       err := c.Get(ctx, 
ctrl.ObjectKeyFromObject(adp.GetAppObj()), adp.GetAppObj())
-                       if err != nil {
-                               if k8serrors.IsNotFound(err) {
-                                       return []reconcile.Request{}
-                               }
-                               log.Errorf(err, "Some error happened while 
trying to get %s %s resource", adp.GetName(), adp.GetKind())
-                       }
-                       createSyntheticIntegration(&it, adp)
-                       target, err := patch.ApplyPatch(&it)
-                       if err == nil {
-                               err = c.Patch(ctx, target, ctrl.Apply, 
ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
-                               if err != nil {
-                                       log.Errorf(err, "Some error happened 
while creating a synthetic Integration after %s %s resource", adp.GetName(), 
adp.GetKind())
-                                       return []reconcile.Request{}
-                               }
-                               log.Infof(
-                                       "Created a synthetic Integration %s 
after %s %s",
-                                       it.GetName(),
-                                       adp.GetName(),
-                                       adp.GetKind(),
-                               )
-                               return []reconcile.Request{
-                                       {
-                                               NamespacedName: 
types.NamespacedName{
-                                                       Namespace: it.Namespace,
-                                                       Name:      it.Name,
-                                               },
-                                       },
-                               }
-                       }
-                       if err != nil {
-                               log.Infof("Could not create Integration %s: 
%s", adp.GetIntegrationName(), err.Error())
-                               return []reconcile.Request{}
-                       }
-               }
-               log.Errorf(err, "Could not get Integration %s", it.GetName())
-               return []reconcile.Request{}
-       }
-
-       return []reconcile.Request{
-               {
-                       NamespacedName: types.NamespacedName{
-                               Namespace: it.Namespace,
-                               Name:      it.Name,
-                       },
-               },
-       }
-}
-
-// createSyntheticIntegration set all required values for a synthetic 
Integration.
-func createSyntheticIntegration(it *v1.Integration, adp 
NonManagedCamelApplicationAdapter) {
-       // We need to create a synthetic Integration
-       it.SetAnnotations(map[string]string{
-               v1.IntegrationImportedNameLabel: adp.GetName(),
-               v1.IntegrationImportedKindLabel: adp.GetKind(),
-               v1.IntegrationSyntheticLabel:    "true",
-       })
-       it.Spec = v1.IntegrationSpec{
-               Traits: adp.GetTraits(),
-       }
-}
-
-// NonManagedCamelApplicationAdapter represents a Camel application built and 
deployed outside the operator lifecycle.
-type NonManagedCamelApplicationAdapter interface {
-       // GetName returns the name of the Camel application.
-       GetName() string
-       // GetKind returns the kind of the Camel application (ie, Deployment, 
Cronjob, ...).
-       GetKind() string
-       // GetTraits in used to retrieve the trait configuration.
-       GetTraits() v1.Traits
-       // GetIntegrationName return the name of the Integration which has to 
be imported.
-       GetIntegrationName() string
-       // GetIntegrationNameSpace return the namespace of the Integration 
which has to be imported.
-       GetIntegrationNameSpace() string
-       // GetAppObj return the object from which we're importing.
-       GetAppObj() ctrl.Object
-}
-
-// NonManagedCamelDeployment represents a regular Camel application built and 
deployed outside the operator lifecycle.
-type NonManagedCamelDeployment struct {
-       deploy *appsv1.Deployment
-}
-
-// GetName returns the name of the Camel application.
-func (app *NonManagedCamelDeployment) GetName() string {
-       return app.deploy.GetName()
-}
-
-// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, 
...).
-func (app *NonManagedCamelDeployment) GetKind() string {
-       return "Deployment"
-}
-
-// GetTraits in used to retrieve the trait configuration.
-func (app *NonManagedCamelDeployment) GetTraits() v1.Traits {
-       return v1.Traits{
-               Container: &trait.ContainerTrait{
-                       Name: app.getContainerNameFromDeployment(),
-               },
-       }
-}
-
-// GetAppObj return the object from which we're importing.
-func (app *NonManagedCamelDeployment) GetAppObj() ctrl.Object {
-       return app.deploy
-}
-
-// GetIntegrationName return the name of the Integration which has to be 
imported.
-func (app *NonManagedCamelDeployment) GetIntegrationName() string {
-       return app.deploy.Labels[v1.IntegrationLabel]
-}
-
-// GetIntegrationNameSpace return the namespace of the Integration which has 
to be imported.
-func (app *NonManagedCamelDeployment) GetIntegrationNameSpace() string {
-       return app.deploy.Namespace
-}
-
-// getContainerNameFromDeployment returns the container name which is running 
the Camel application.
-func (app *NonManagedCamelDeployment) getContainerNameFromDeployment() string {
-       firstContainerName := ""
-       for _, ct := range app.deploy.Spec.Template.Spec.Containers {
-               // set as fallback if no container is named as the deployment
-               if firstContainerName == "" {
-                       firstContainerName = app.deploy.Name
-               }
-               if ct.Name == app.deploy.Name {
-                       return app.deploy.Name
-               }
-       }
-       return firstContainerName
-}
-
-// NonManagedCamelCronjob represents a cron Camel application built and 
deployed outside the operator lifecycle.
-type NonManagedCamelCronjob struct {
-       cron *batchv1.CronJob
-}
-
-// GetName returns the name of the Camel application.
-func (app *NonManagedCamelCronjob) GetName() string {
-       return app.cron.GetName()
-}
-
-// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, 
...).
-func (app *NonManagedCamelCronjob) GetKind() string {
-       return "CronJob"
-}
-
-// GetTraits in used to retrieve the trait configuration.
-func (app *NonManagedCamelCronjob) GetTraits() v1.Traits {
-       return v1.Traits{}
-}
-
-// GetIntegrationName return the name of the Integration which has to be 
imported.
-func (app *NonManagedCamelCronjob) GetIntegrationName() string {
-       return app.cron.Labels[v1.IntegrationLabel]
-}
-
-// GetIntegrationNameSpace return the namespace of the Integration which has 
to be imported.
-func (app *NonManagedCamelCronjob) GetIntegrationNameSpace() string {
-       return app.cron.Namespace
-}
-
-// GetAppObj return the object from which we're importing.
-func (app *NonManagedCamelCronjob) GetAppObj() ctrl.Object {
-       return app.cron
-}
-
-// NonManagedCamelKnativeService represents a Knative Service based Camel 
application built and deployed outside the operator lifecycle.
-type NonManagedCamelKnativeService struct {
-       ksvc *servingv1.Service
-}
-
-// GetName returns the name of the Camel application.
-func (app *NonManagedCamelKnativeService) GetName() string {
-       return app.ksvc.GetName()
-}
-
-// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, 
...).
-func (app *NonManagedCamelKnativeService) GetKind() string {
-       return "KnativeService"
-}
-
-// GetTraits in used to retrieve the trait configuration.
-func (app *NonManagedCamelKnativeService) GetTraits() v1.Traits {
-       return v1.Traits{}
-}
-
-// GetIntegrationName return the name of the Integration which has to be 
imported.
-func (app *NonManagedCamelKnativeService) GetIntegrationName() string {
-       return app.ksvc.Labels[v1.IntegrationLabel]
-}
-
-// GetIntegrationNameSpace return the namespace of the Integration which has 
to be imported.
-func (app *NonManagedCamelKnativeService) GetIntegrationNameSpace() string {
-       return app.ksvc.Namespace
-}
-
-// GetAppObj return the object from which we're importing.
-func (app *NonManagedCamelKnativeService) GetAppObj() ctrl.Object {
-       return app.ksvc
-}
diff --git a/pkg/controller/integration/monitor.go 
b/pkg/controller/integration/monitor.go
index e2d3b32a3..fb86ed41d 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -60,7 +60,6 @@ func (action *monitorAction) CanHandle(integration 
*v1.Integration) bool {
        return integration.Status.Phase == v1.IntegrationPhaseDeploying ||
                integration.Status.Phase == v1.IntegrationPhaseRunning ||
                integration.Status.Phase == v1.IntegrationPhaseError ||
-               integration.Status.Phase == v1.IntegrationPhaseImportMissing ||
                integration.Status.Phase == v1.IntegrationPhaseCannotMonitor
 }
 
diff --git a/pkg/controller/integration/monitor_synthetic.go 
b/pkg/controller/integration/monitor_synthetic.go
index a10a03deb..a1aa86a43 100644
--- a/pkg/controller/integration/monitor_synthetic.go
+++ b/pkg/controller/integration/monitor_synthetic.go
@@ -19,10 +19,8 @@ package integration
 
 import (
        "context"
-       "fmt"
 
        corev1 "k8s.io/api/core/v1"
-       k8serrors "k8s.io/apimachinery/pkg/api/errors"
 
        v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
        "github.com/apache/camel-k/v2/pkg/trait"
@@ -44,22 +42,6 @@ func (action *monitorSyntheticAction) Name() string {
 func (action *monitorSyntheticAction) Handle(ctx context.Context, integration 
*v1.Integration) (*v1.Integration, error) {
        environment, err := trait.NewSyntheticEnvironment(ctx, action.client, 
integration, nil)
        if err != nil {
-               if k8serrors.IsNotFound(err) {
-                       // Not an error: the resource from which we imported 
has been deleted, report in it status.
-                       // It may be a temporary situation, for example, if the 
deployment from which the Integration is imported
-                       // is being redeployed. For this reason we should keep 
the Integration instead of forcefully removing it.
-                       message := fmt.Sprintf(
-                               "import %s %s no longer available",
-                               
integration.Annotations[v1.IntegrationImportedKindLabel],
-                               
integration.Annotations[v1.IntegrationImportedNameLabel],
-                       )
-                       action.L.Info(message)
-                       integration.SetReadyConditionError(message)
-                       zero := int32(0)
-                       integration.Status.Phase = 
v1.IntegrationPhaseImportMissing
-                       integration.Status.Replicas = &zero
-                       return integration, nil
-               }
                // report the error
                integration.Status.Phase = v1.IntegrationPhaseError
                integration.SetReadyCondition(corev1.ConditionFalse, 
v1.IntegrationConditionImportingKindAvailableReason, err.Error())
diff --git a/pkg/controller/integration/predicate.go 
b/pkg/controller/integration/predicate.go
index 0feb71fec..79d61556a 100644
--- a/pkg/controller/integration/predicate.go
+++ b/pkg/controller/integration/predicate.go
@@ -21,7 +21,6 @@ import (
        "reflect"
 
        "k8s.io/apimachinery/pkg/api/equality"
-       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/predicate"
 )
@@ -56,39 +55,3 @@ func (StatusChangedPredicate) Update(e event.UpdateEvent) 
bool {
 
        return !equality.Semantic.DeepDerivative(s1.Interface(), s2.Interface())
 }
-
-// NonManagedObjectPredicate implements a generic update predicate function 
for managed object.
-type NonManagedObjectPredicate struct {
-       predicate.Funcs
-}
-
-// Create --.
-func (NonManagedObjectPredicate) Create(e event.CreateEvent) bool {
-       return !isManagedObject(e.Object)
-}
-
-// Update --.
-func (NonManagedObjectPredicate) Update(e event.UpdateEvent) bool {
-       return !isManagedObject(e.ObjectNew)
-}
-
-// Delete --.
-func (NonManagedObjectPredicate) Delete(e event.DeleteEvent) bool {
-       return !isManagedObject(e.Object)
-}
-
-// Generic --.
-func (NonManagedObjectPredicate) Generic(e event.GenericEvent) bool {
-       return !isManagedObject(e.Object)
-}
-
-// isManagedObject returns true if the object is managed by an Integration.
-func isManagedObject(obj ctrl.Object) bool {
-       for _, mr := range obj.GetOwnerReferences() {
-               if mr.APIVersion == "camel.apache.org/v1" &&
-                       mr.Kind == "Integration" {
-                       return true
-               }
-       }
-       return false
-}
diff --git a/pkg/controller/pipe/pipe_controller.go 
b/pkg/controller/pipe/pipe_controller.go
index 36da7fca1..5b174e435 100644
--- a/pkg/controller/pipe/pipe_controller.go
+++ b/pkg/controller/pipe/pipe_controller.go
@@ -66,7 +66,7 @@ func newReconciler(mgr manager.Manager, c client.Client) 
reconcile.Reconciler {
 }
 
 func add(mgr manager.Manager, r reconcile.Reconciler) error {
-       c, err := controller.New("kamelet-binding-controller", mgr, 
controller.Options{Reconciler: r})
+       c, err := controller.New("pipe-controller", mgr, 
controller.Options{Reconciler: r})
        if err != nil {
                return err
        }
diff --git a/pkg/controller/synthetic/synthetic.go 
b/pkg/controller/synthetic/synthetic.go
new file mode 100644
index 000000000..bd785d318
--- /dev/null
+++ b/pkg/controller/synthetic/synthetic.go
@@ -0,0 +1,300 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package synthetic
+
+import (
+       "context"
+       "fmt"
+       "reflect"
+
+       v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
+       "github.com/apache/camel-k/v2/pkg/client"
+       "github.com/apache/camel-k/v2/pkg/platform"
+       "github.com/apache/camel-k/v2/pkg/util/kubernetes"
+       "github.com/apache/camel-k/v2/pkg/util/log"
+       appsv1 "k8s.io/api/apps/v1"
+       batchv1 "k8s.io/api/batch/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       clientgocache "k8s.io/client-go/tools/cache"
+       "knative.dev/serving/pkg/apis/serving"
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+       "sigs.k8s.io/controller-runtime/pkg/cache"
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// ManageSyntheticIntegrations is the controller for synthetic Integrations. 
Consider that the lifecycle of the objects are driven
+// by the way we are monitoring them. Since we're filtering by 
`camel.apache.org/integration` label in the cached clinet,
+// you must consider an add, update or delete
+// accordingly, ie, when the user label the resource, then it is considered as 
an add, when it removes the label, it is considered as a delete.
+// We must filter only non managed objects in order to avoid to conflict with 
the reconciliation loop of managed objects (owned by an Integration).
+func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache 
cache.Cache, reader ctrl.Reader) error {
+       informers, err := getInformers(ctx, c, cache)
+       if err != nil {
+               return err
+       }
+       for _, informer := range informers {
+               _, err := 
informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{
+                       AddFunc: func(obj interface{}) {
+                               ctrlObj, ok := obj.(ctrl.Object)
+                               if !ok {
+                                       log.Error(fmt.Errorf("type assertion 
failed: %v", obj), "Failed to retrieve Object on add event")
+                                       return
+                               }
+                               if !isManagedObject(ctrlObj) {
+                                       integrationName := 
ctrlObj.GetLabels()[v1.IntegrationLabel]
+                                       it, err := getSyntheticIntegration(ctx, 
c, ctrlObj.GetNamespace(), integrationName)
+                                       if err != nil {
+                                               if k8serrors.IsNotFound(err) {
+                                                       adapter, err := 
nonManagedCamelApplicationFactory(ctrlObj)
+                                                       if err != nil {
+                                                               log.Errorf(err, 
"Some error happened while creating a Camel application adapter for %s", 
integrationName)
+                                                       }
+                                                       if err = 
createSyntheticIntegration(ctx, c, adapter.Integration()); err != nil {
+                                                               log.Errorf(err, 
"Some error happened while creating a synthetic Integration %s", 
integrationName)
+                                                       }
+                                                       log.Infof("Created a 
synthetic Integration %s after %s resource object", it.GetName(), 
ctrlObj.GetName())
+                                               } else {
+                                                       log.Errorf(err, "Some 
error happened while loading a synthetic Integration %s", integrationName)
+                                               }
+                                       } else {
+                                               if it.Status.Phase == 
v1.IntegrationPhaseImportMissing {
+                                                       // Update with proper 
phase (reconciliation will take care)
+                                                       it.Status.Phase = 
v1.IntegrationPhaseNone
+                                                       if err = 
updateSyntheticIntegration(ctx, c, it); err != nil {
+                                                               log.Errorf(err, 
"Some error happened while updatinf a synthetic Integration %s", 
integrationName)
+                                                       }
+                                               } else {
+                                                       log.Infof("Synthetic 
Integration %s is in phase %s. Skipping.", integrationName, it.Status.Phase)
+                                               }
+                                       }
+                               }
+                       },
+                       DeleteFunc: func(obj interface{}) {
+                               ctrlObj, ok := obj.(ctrl.Object)
+                               if !ok {
+                                       log.Error(fmt.Errorf("type assertion 
failed: %v", obj), "Failed to retrieve Object on delete event")
+                                       return
+                               }
+                               if !isManagedObject(ctrlObj) {
+                                       integrationName := 
ctrlObj.GetLabels()[v1.IntegrationLabel]
+                                       // We must use a non caching client to 
understand if the object has been deleted from the cluster or only deleted from
+                                       // the cache (ie, user removed the 
importing label)
+                                       err := reader.Get(ctx, 
ctrl.ObjectKeyFromObject(ctrlObj), ctrlObj)
+                                       if err != nil {
+                                               if k8serrors.IsNotFound(err) {
+                                                       // Object removed from 
the cluster
+                                                       it, err := 
getSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName)
+                                                       if err != nil {
+                                                               log.Errorf(err, 
"Some error happened while loading a synthetic Integration %s", it.Name)
+                                                               return
+                                                       }
+                                                       // The resource from 
which we imported has been deleted, report in it status.
+                                                       // It may be a 
temporary situation, for example, if the deployment from which the Integration 
is imported
+                                                       // is being redeployed. 
For this reason we should keep the Integration instead of forcefully removing 
it.
+                                                       message := fmt.Sprintf(
+                                                               "import %s %s 
no longer available",
+                                                               
it.Annotations[v1.IntegrationImportedKindLabel],
+                                                               
it.Annotations[v1.IntegrationImportedNameLabel],
+                                                       )
+                                                       
it.SetReadyConditionError(message)
+                                                       zero := int32(0)
+                                                       it.Status.Phase = 
v1.IntegrationPhaseImportMissing
+                                                       it.Status.Replicas = 
&zero
+                                                       if err = 
updateSyntheticIntegration(ctx, c, it); err != nil {
+                                                               log.Errorf(err, 
"Some error happened while updating a synthetic Integration %s", it.Name)
+                                                       }
+                                                       log.Infof("Updated 
synthetic Integration %s with status %s", it.GetName(), it.Status.Phase)
+                                               } else {
+                                                       log.Errorf(err, "Some 
error happened while loading object %s from the cluster", ctrlObj.GetName())
+                                                       return
+                                               }
+                                       } else {
+                                               // Importing label removed
+                                               if err = 
deleteSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName); 
err != nil {
+                                                       log.Errorf(err, "Some 
error happened while deleting a synthetic Integration %s", integrationName)
+                                               }
+                                               log.Infof("Deleted synthetic 
Integration %s", integrationName)
+                                       }
+                               }
+                       },
+               })
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func getInformers(ctx context.Context, cl client.Client, c cache.Cache) 
([]cache.Informer, error) {
+       deploy, err := c.GetInformer(ctx, &appsv1.Deployment{})
+       if err != nil {
+               return nil, err
+       }
+       informers := []cache.Informer{deploy}
+       // Watch for the CronJob conditionally
+       if ok, err := kubernetes.IsAPIResourceInstalled(cl, 
batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); 
ok && err == nil {
+               cron, err := c.GetInformer(ctx, &batchv1.CronJob{})
+               if err != nil {
+                       return nil, err
+               }
+               informers = append(informers, cron)
+       }
+       // Watch for the Knative Services conditionally
+       if ok, err := kubernetes.IsAPIResourceInstalled(cl, 
servingv1.SchemeGroupVersion.String(), 
reflect.TypeOf(servingv1.Service{}).Name()); ok && err == nil {
+               if ok, err := kubernetes.CheckPermission(ctx, cl, 
serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", 
"watch"); ok && err == nil {
+                       ksvc, err := c.GetInformer(ctx, &servingv1.Service{})
+                       if err != nil {
+                               return nil, err
+                       }
+                       informers = append(informers, ksvc)
+               }
+       }
+
+       return informers, nil
+}
+
+func getSyntheticIntegration(ctx context.Context, c client.Client, namespace, 
name string) (*v1.Integration, error) {
+       it := v1.NewIntegration(namespace, name)
+       err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it)
+       return &it, err
+}
+
+func createSyntheticIntegration(ctx context.Context, c client.Client, it 
*v1.Integration) error {
+       return c.Create(ctx, it, ctrl.FieldOwner("camel-k-operator"))
+}
+
+func deleteSyntheticIntegration(ctx context.Context, c client.Client, 
namespace, name string) error {
+       // As the Integration label was removed, we don't know which is the 
Synthetic integration to remove
+       it := v1.NewIntegration(namespace, name)
+       return c.Delete(ctx, &it)
+}
+
+func updateSyntheticIntegration(ctx context.Context, c client.Client, it 
*v1.Integration) error {
+       return c.Status().Update(ctx, it, ctrl.FieldOwner("camel-k-operator"))
+}
+
+// isManagedObject returns true if the object is managed by an Integration.
+func isManagedObject(obj ctrl.Object) bool {
+       for _, mr := range obj.GetOwnerReferences() {
+               if mr.APIVersion == "camel.apache.org/v1" &&
+                       mr.Kind == "Integration" {
+                       return true
+               }
+       }
+       return false
+}
+
+// nonManagedCamelApplicationAdapter represents a Camel application built and 
deployed outside the operator lifecycle.
+type nonManagedCamelApplicationAdapter interface {
+       // Integration return an Integration resource fed by the Camel 
application adapter.
+       Integration() *v1.Integration
+}
+
+func nonManagedCamelApplicationFactory(obj ctrl.Object) 
(nonManagedCamelApplicationAdapter, error) {
+       deploy, ok := obj.(*appsv1.Deployment)
+       if ok {
+               return &nonManagedCamelDeployment{deploy: deploy}, nil
+       }
+       cronjob, ok := obj.(*batchv1.CronJob)
+       if ok {
+               return &NonManagedCamelCronjob{cron: cronjob}, nil
+       }
+       ksvc, ok := obj.(*servingv1.Service)
+       if ok {
+               return &NonManagedCamelKnativeService{ksvc: ksvc}, nil
+       }
+       return nil, fmt.Errorf("unsupported %s object kind", obj)
+}
+
+// NonManagedCamelDeployment represents a regular Camel application built and 
deployed outside the operator lifecycle.
+type nonManagedCamelDeployment struct {
+       deploy *appsv1.Deployment
+}
+
+// Integration return an Integration resource fed by the Camel application 
adapter.
+func (app *nonManagedCamelDeployment) Integration() *v1.Integration {
+       it := v1.NewIntegration(app.deploy.Namespace, 
app.deploy.Labels[v1.IntegrationLabel])
+       it.SetAnnotations(map[string]string{
+               v1.IntegrationImportedNameLabel: app.deploy.Name,
+               v1.IntegrationImportedKindLabel: "Deployment",
+               v1.IntegrationSyntheticLabel:    "true",
+       })
+       it.Spec = v1.IntegrationSpec{
+               Traits: v1.Traits{
+                       Container: &trait.ContainerTrait{
+                               Name: app.getContainerNameFromDeployment(),
+                       },
+               },
+       }
+       return &it
+}
+
+// getContainerNameFromDeployment returns the container name which is running 
the Camel application.
+func (app *nonManagedCamelDeployment) getContainerNameFromDeployment() string {
+       firstContainerName := ""
+       for _, ct := range app.deploy.Spec.Template.Spec.Containers {
+               // set as fallback if no container is named as the deployment
+               if firstContainerName == "" {
+                       firstContainerName = app.deploy.Name
+               }
+               if ct.Name == app.deploy.Name {
+                       return app.deploy.Name
+               }
+       }
+       return firstContainerName
+}
+
+// NonManagedCamelCronjob represents a cron Camel application built and 
deployed outside the operator lifecycle.
+type NonManagedCamelCronjob struct {
+       cron *batchv1.CronJob
+}
+
+// Integration return an Integration resource fed by the Camel application 
adapter.
+func (app *NonManagedCamelCronjob) Integration() *v1.Integration {
+       it := v1.NewIntegration(app.cron.Namespace, 
app.cron.Labels[v1.IntegrationLabel])
+       it.SetAnnotations(map[string]string{
+               v1.IntegrationImportedNameLabel: app.cron.Name,
+               v1.IntegrationImportedKindLabel: "CronJob",
+               v1.IntegrationSyntheticLabel:    "true",
+       })
+       it.Spec = v1.IntegrationSpec{
+               Traits: v1.Traits{},
+       }
+       return &it
+}
+
+// NonManagedCamelKnativeService represents a Knative Service based Camel 
application built and deployed outside the operator lifecycle.
+type NonManagedCamelKnativeService struct {
+       ksvc *servingv1.Service
+}
+
+// Integration return an Integration resource fed by the Camel application 
adapter.
+func (app *NonManagedCamelKnativeService) Integration() *v1.Integration {
+       it := v1.NewIntegration(app.ksvc.Namespace, 
app.ksvc.Labels[v1.IntegrationLabel])
+       it.SetAnnotations(map[string]string{
+               v1.IntegrationImportedNameLabel: app.ksvc.Name,
+               v1.IntegrationImportedKindLabel: "KnativeService",
+               v1.IntegrationSyntheticLabel:    "true",
+       })
+       it.Spec = v1.IntegrationSpec{
+               Traits: v1.Traits{},
+       }
+       return &it
+}

Reply via email to