This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit dbea5cc5800b4cc03be7e7c46193123c9c071393 Author: Pasquale Congiusti <pasquale.congiu...@gmail.com> AuthorDate: Wed Nov 29 17:25:35 2023 +0100 feat: import external Camel applications --- config/rbac/namespaced/operator-role-knative.yaml | 8 + pkg/apis/camel/v1/integration_types.go | 18 +- pkg/apis/camel/v1/integration_types_support.go | 15 ++ pkg/cmd/operator/operator.go | 1 - pkg/controller/integration/initialize.go | 87 +++++++ .../integration/integration_controller.go | 173 ++++++++++---- .../integration/integration_controller_import.go | 249 +++++++++++++++++++++ pkg/controller/integration/monitor.go | 54 ++++- pkg/controller/integration/monitor_cronjob.go | 16 ++ pkg/controller/integration/monitor_deployment.go | 13 ++ pkg/controller/integration/monitor_knative.go | 17 ++ pkg/controller/integration/monitor_synthetic.go | 70 ++++++ pkg/controller/integration/predicate.go | 37 +++ pkg/trait/camel.go | 3 +- pkg/trait/platform.go | 3 +- pkg/trait/trait.go | 71 +++++- 16 files changed, 770 insertions(+), 65 deletions(-) diff --git a/config/rbac/namespaced/operator-role-knative.yaml b/config/rbac/namespaced/operator-role-knative.yaml index 3cba80931..7e1d2f349 100644 --- a/config/rbac/namespaced/operator-role-knative.yaml +++ b/config/rbac/namespaced/operator-role-knative.yaml @@ -35,6 +35,14 @@ rules: - patch - update - watch +- apiGroups: + - serving.knative.dev + resources: + - revisions + verbs: + - get + - list + - watch - apiGroups: - eventing.knative.dev resources: diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go index 78dd40a8c..9bcecaad2 100644 --- a/pkg/apis/camel/v1/integration_types.go +++ b/pkg/apis/camel/v1/integration_types.go @@ -155,7 +155,13 @@ const ( IntegrationPhaseRunning IntegrationPhase = "Running" // IntegrationPhaseError --. IntegrationPhaseError IntegrationPhase = "Error" + // IntegrationPhaseImportMissing used when the application from which the Integration is imported has been deleted. + IntegrationPhaseImportMissing IntegrationPhase = "Application Missing" + // IntegrationPhaseCannotMonitor used when the application from which the Integration has not enough information to monitor its pods. + IntegrationPhaseCannotMonitor IntegrationPhase = "Cannot Monitor Pods" + // IntegrationConditionReady --. + IntegrationConditionReady IntegrationConditionType = "Ready" // IntegrationConditionKitAvailable --. IntegrationConditionKitAvailable IntegrationConditionType = "IntegrationKitAvailable" // IntegrationConditionPlatformAvailable --. @@ -178,10 +184,11 @@ const ( IntegrationConditionJolokiaAvailable IntegrationConditionType = "JolokiaAvailable" // IntegrationConditionProbesAvailable --. IntegrationConditionProbesAvailable IntegrationConditionType = "ProbesAvailable" - // IntegrationConditionReady --. - IntegrationConditionReady IntegrationConditionType = "Ready" // IntegrationConditionTraitInfo --. IntegrationConditionTraitInfo IntegrationConditionType = "TraitInfo" + // IntegrationConditionMonitoringPodsAvailable used to specify that the Pods generated are available for monitoring. + IntegrationConditionMonitoringPodsAvailable IntegrationConditionType = "MonitoringPodsAvailable" + // IntegrationConditionKitAvailableReason --. IntegrationConditionKitAvailableReason string = "IntegrationKitAvailable" // IntegrationConditionPlatformAvailableReason --. @@ -220,7 +227,8 @@ const ( IntegrationConditionJolokiaAvailableReason string = "JolokiaAvailable" // IntegrationConditionProbesAvailableReason --. IntegrationConditionProbesAvailableReason string = "ProbesAvailable" - + // IntegrationConditionMonitoringPodsAvailableReason used to specify that the Pods generated are available for monitoring. + IntegrationConditionMonitoringPodsAvailableReason string = "MonitoringPodsAvailable" // IntegrationConditionKnativeServiceReadyReason --. IntegrationConditionKnativeServiceReadyReason string = "KnativeServiceReady" // IntegrationConditionDeploymentReadyReason --. @@ -239,18 +247,18 @@ const ( IntegrationConditionRuntimeNotReadyReason string = "RuntimeNotReady" // IntegrationConditionErrorReason --. IntegrationConditionErrorReason string = "Error" - // IntegrationConditionInitializationFailedReason --. IntegrationConditionInitializationFailedReason string = "InitializationFailed" // IntegrationConditionUnsupportedLanguageReason --. IntegrationConditionUnsupportedLanguageReason string = "UnsupportedLanguage" - // IntegrationConditionKameletsAvailable --. IntegrationConditionKameletsAvailable IntegrationConditionType = "KameletsAvailable" // IntegrationConditionKameletsAvailableReason --. IntegrationConditionKameletsAvailableReason string = "KameletsAvailable" // IntegrationConditionKameletsNotAvailableReason --. IntegrationConditionKameletsNotAvailableReason string = "KameletsNotAvailable" + // IntegrationConditionImportingKindAvailableReason used (as false) if we're trying to import an unsupported kind. + IntegrationConditionImportingKindAvailableReason string = "ImportingKindAvailable" ) // IntegrationCondition describes the state of a resource at a certain point. diff --git a/pkg/apis/camel/v1/integration_types_support.go b/pkg/apis/camel/v1/integration_types_support.go index ef24e207b..3342be76a 100644 --- a/pkg/apis/camel/v1/integration_types_support.go +++ b/pkg/apis/camel/v1/integration_types_support.go @@ -25,8 +25,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// IntegrationLabel is used to tag k8s object created by a given Integration. const IntegrationLabel = "camel.apache.org/integration" +// IntegrationSyntheticLabel is used to tag k8s synthetic Integrations. +const IntegrationSyntheticLabel = "camel.apache.org/is-synthetic" + +// IntegrationImportedKindLabel specifies from what kind of resource an Integration was imported. +const IntegrationImportedKindLabel = "camel.apache.org/imported-from-kind" + +// IntegrationImportedNameLabel specifies from what resource an Integration was imported. +const IntegrationImportedNameLabel = "camel.apache.org/imported-from-name" + func NewIntegration(namespace string, name string) Integration { return Integration{ TypeMeta: metav1.TypeMeta{ @@ -283,6 +293,11 @@ func (in *Integration) SetReadyConditionError(err string) { in.SetReadyCondition(corev1.ConditionFalse, IntegrationConditionErrorReason, err) } +// IsSynthetic returns true for synthetic Integrations (non managed, likely imported from external deployments). +func (in *Integration) IsSynthetic() bool { + return in.Annotations[IntegrationSyntheticLabel] == "true" +} + // GetCondition returns the condition with the provided type. func (in *IntegrationStatus) GetCondition(condType IntegrationConditionType) *IntegrationCondition { for i := range in.Conditions { diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 04b5ea8b2..ab59ab638 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -188,7 +188,6 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID selector := labels.NewSelector().Add(*hasIntegrationLabel) selectors := map[ctrl.Object]cache.ByObject{ - &corev1.Pod{}: {Label: selector}, &appsv1.Deployment{}: {Label: selector}, &batchv1.Job{}: {Label: selector}, &servingv1.Service{}: {Label: selector}, diff --git a/pkg/controller/integration/initialize.go b/pkg/controller/integration/initialize.go index a08dd28c6..ad8891647 100644 --- a/pkg/controller/integration/initialize.go +++ b/pkg/controller/integration/initialize.go @@ -19,6 +19,7 @@ package integration import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -53,6 +54,10 @@ func (action *initializeAction) CanHandle(integration *v1.Integration) bool { func (action *initializeAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { action.L.Info("Initializing Integration") + if integration.Annotations[v1.IntegrationImportedNameLabel] != "" { + return action.importFromExternalApp(integration) + } + if _, err := trait.Apply(ctx, action.client, integration, nil); err != nil { integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyCondition(corev1.ConditionFalse, @@ -91,3 +96,85 @@ func (action *initializeAction) Handle(ctx context.Context, integration *v1.Inte return integration, nil } + +func (action *initializeAction) importFromExternalApp(integration *v1.Integration) (*v1.Integration, error) { + readyMessage := fmt.Sprintf( + "imported from %s %s", + integration.Annotations[v1.IntegrationImportedNameLabel], + integration.Annotations[v1.IntegrationImportedKindLabel], + ) + // We need to set the condition for which this Integration is imported (required later by monitoring) + integration.Status.SetConditions( + getCamelAppImportingCondition( + integration.Annotations[v1.IntegrationImportedKindLabel], + readyMessage, + )..., + ) + // If it's ready, then we can safely assume the integration is running + if integration.IsConditionTrue(v1.IntegrationConditionReady) { + integration.Status.Phase = v1.IntegrationPhaseRunning + } else { + integration.Status.Phase = v1.IntegrationPhaseError + } + + return integration, nil +} + +func getCamelAppImportingCondition(kind, message string) []v1.IntegrationCondition { + switch kind { + case "Deployment": + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionDeploymentAvailable, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionDeploymentAvailableReason, + Message: message, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionDeploymentReadyReason, + Message: message, + }, + } + case "CronJob": + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionCronJobAvailable, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionCronJobCreatedReason, + Message: message, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionDeploymentReadyReason, + Message: message, + }, + } + case "KnativeService": + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionKnativeServiceAvailable, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionKnativeServiceAvailableReason, + Message: message, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionKnativeServiceReadyReason, + Message: message, + }, + } + default: + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionFalse, + Reason: v1.IntegrationConditionImportingKindAvailableReason, + Message: fmt.Sprintf("Unsupported %s import kind", kind), + }, + } + } +} diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index a70a8713b..c3dcd30f4 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -27,12 +27,12 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" - "sigs.k8s.io/controller-runtime/pkg/builder" ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -324,30 +324,47 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile. // Evaluates to false if the object has been confirmed deleted return !e.DeleteStateUnknown }, - })). - // Watch for IntegrationKit phase transitioning to ready or error, and - // enqueue requests for any integration that matches the kit, in building - // or running phase. - Watches(&v1.IntegrationKit{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { - kit, ok := a.(*v1.IntegrationKit) - if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list") - return []reconcile.Request{} - } + })) + // Watch for all the resources + watchIntegrationResources(c, b) + // Watch for the CronJob conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { + watchCronJobResources(c, b) + } + // Watch for the Knative Services conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil { + return err + } else if ok { + if err = watchKnativeResources(ctx, c, b); err != nil { + return err + } + } - return integrationKitEnqueueRequestsFromMapFunc(ctx, c, kit) - })). + return b.Complete(r) +} + +func watchIntegrationResources(c client.Client, b *builder.Builder) { + // Watch for IntegrationKit phase transitioning to ready or error, and + // enqueue requests for any integration that matches the kit, in building + // or running phase. + b.Watches(&v1.IntegrationKit{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { + kit, ok := a.(*v1.IntegrationKit) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve IntegrationKit") + return []reconcile.Request{} + } + return integrationKitEnqueueRequestsFromMapFunc(ctx, c, kit) + })). // Watch for IntegrationPlatform phase transitioning to ready and enqueue // requests for any integrations that are in phase waiting for platform Watches(&v1.IntegrationPlatform{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { p, ok := a.(*v1.IntegrationPlatform) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to list integrations") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve IntegrationPlatform") return []reconcile.Request{} } - return integrationPlatformEnqueueRequestsFromMapFunc(ctx, c, p) })). // Watch for Configmaps or Secret used in the Integrations for updates @@ -355,30 +372,29 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile. handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { cm, ok := a.(*corev1.ConfigMap) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Configmap") return []reconcile.Request{} } - return configmapEnqueueRequestsFromMapFunc(ctx, c, cm) })). Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { secret, ok := a.(*corev1.Secret) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Secret") return []reconcile.Request{} } - return secretEnqueueRequestsFromMapFunc(ctx, c, secret) })). - // Watch for the owned Deployments - Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})). - // Watch for the Integration Pods + // Watch for the Integration Pods belonging to managed Integrations Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { pod, ok := a.(*corev1.Pod) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to list integration pods") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Pod") + return []reconcile.Request{} + } + if pod.Labels[v1.IntegrationLabel] == "" { return []reconcile.Request{} } return []reconcile.Request{ @@ -389,36 +405,90 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile. }, }, } - })) + })). + // Watch for non managed Deployments (ie, imported) + Watches(&appsv1.Deployment{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { + deploy, ok := a.(*appsv1.Deployment) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Deployment") + return []reconcile.Request{} + } + return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelDeployment{deploy: deploy}) + }), + builder.WithPredicates(NonManagedObjectPredicate{}), + ). + // Watch for the owned Deployments + Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})) +} - if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { +func watchCronJobResources(c client.Client, b *builder.Builder) { + // Watch for non managed Deployments (ie, imported) + b.Watches(&batchv1.CronJob{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { + cron, ok := a.(*batchv1.CronJob) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve CronJob") + return []reconcile.Request{} + } + return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelCronjob{cron: cron}) + }), + builder.WithPredicates(NonManagedObjectPredicate{}), + ). // Watch for the owned CronJobs - b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})) - } + Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})) +} - // Watch for the owned Knative Services conditionally - if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil { +func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Builder) error { + // Check for permission to watch the Knative Service resource + checkCtx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + if ok, err := kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil { return err } else if ok { - // Check for permission to watch the Knative Service resource - checkCtx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - if ok, err = kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil { - return err - } else if ok { - log.Info("KnativeService resources installed in the cluster. RBAC privileges assigned correctly, you can use Knative features.") - b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{})) - } else { - log.Info(` KnativeService resources installed in the cluster. However Camel K operator has not the required RBAC privileges. You can't use Knative features. - Make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for Camel K managed Knative Services.`) - } - } else { - log.Info(`KnativeService resources are not installed in the cluster. You can't use Knative features. If you install Knative Serving resources after the - Camel K operator, make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for - Camel K managed Knative Services.`) + // Watch for non managed Knative Service (ie, imported) + b.Watches(&servingv1.Service{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { + ksvc, ok := a.(*servingv1.Service) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService") + return []reconcile.Request{} + } + return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc}) + }), + builder.WithPredicates(NonManagedObjectPredicate{}), + ). + // We must watch also Revisions, since it's the object that really change when a Knative service scales up and down + Watches(&servingv1.Revision{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { + revision, ok := a.(*servingv1.Revision) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService Revision") + return []reconcile.Request{} + } + ksvc := &servingv1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: servingv1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: revision.Labels["serving.knative.dev/service"], + Namespace: revision.Namespace, + }, + } + err := c.Get(ctx, ctrl.ObjectKeyFromObject(ksvc), ksvc) + if err != nil { + // The revision does not belong to any managed (owned or imported) KnativeService, just discard + return []reconcile.Request{} + } + return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc}) + }), + builder.WithPredicates(NonManagedObjectPredicate{}), + ). + // Watch for the owned CronJobs + Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{})) } - - return b.Complete(r) + return nil } var _ reconcile.Reconciler = &reconcileIntegration{} @@ -476,7 +546,12 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile. NewPlatformSetupAction(), NewInitializeAction(), newBuildKitAction(), - NewMonitorAction(), + } + + if instance.IsSynthetic() { + actions = append(actions, NewMonitorSyntheticAction()) + } else { + actions = append(actions, NewMonitorAction()) } for _, a := range actions { diff --git a/pkg/controller/integration/integration_controller_import.go b/pkg/controller/integration/integration_controller_import.go new file mode 100644 index 000000000..403185509 --- /dev/null +++ b/pkg/controller/integration/integration_controller_import.go @@ -0,0 +1,249 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/util/log" + "github.com/apache/camel-k/v2/pkg/util/patch" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// nonManagedCamelAppEnqueueRequestsFromMapFunc represent the function to discover the Integration which has to be woke up: it creates a synthetic +// Integration if the Integration does not exist. This is used to import external Camel applications. +func nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx context.Context, c client.Client, adp NonManagedCamelApplicationAdapter) []reconcile.Request { + if adp.GetIntegrationName() == "" { + return []reconcile.Request{} + } + it := v1.NewIntegration(adp.GetIntegrationNameSpace(), adp.GetIntegrationName()) + err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it) + if err != nil { + if k8serrors.IsNotFound(err) { + // We must perform this check to make sure the resource is not being deleted. + // In such case it makes no sense to create an Integration after it. + err := c.Get(ctx, ctrl.ObjectKeyFromObject(adp.GetAppObj()), adp.GetAppObj()) + if err != nil { + if k8serrors.IsNotFound(err) { + return []reconcile.Request{} + } + log.Errorf(err, "Some error happened while trying to get %s %s resource", adp.GetName(), adp.GetKind()) + } + createSyntheticIntegration(&it, adp) + target, err := patch.ApplyPatch(&it) + if err == nil { + err = c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) + if err != nil { + log.Errorf(err, "Some error happened while creating a synthetic Integration after %s %s resource", adp.GetName(), adp.GetKind()) + return []reconcile.Request{} + } + log.Infof( + "Created a synthetic Integration %s after %s %s", + it.GetName(), + adp.GetName(), + adp.GetKind(), + ) + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: it.Namespace, + Name: it.Name, + }, + }, + } + } + if err != nil { + log.Infof("Could not create Integration %s: %s", adp.GetIntegrationName(), err.Error()) + return []reconcile.Request{} + } + } + log.Errorf(err, "Could not get Integration %s", it.GetName()) + return []reconcile.Request{} + } + + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: it.Namespace, + Name: it.Name, + }, + }, + } +} + +// createSyntheticIntegration set all required values for a synthetic Integration. +func createSyntheticIntegration(it *v1.Integration, adp NonManagedCamelApplicationAdapter) { + // We need to create a synthetic Integration + it.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: adp.GetName(), + v1.IntegrationImportedKindLabel: adp.GetKind(), + v1.IntegrationSyntheticLabel: "true", + }) + it.Spec = v1.IntegrationSpec{ + Traits: adp.GetTraits(), + } +} + +// NonManagedCamelApplicationAdapter represents a Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelApplicationAdapter interface { + // GetName returns the name of the Camel application. + GetName() string + // GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). + GetKind() string + // GetTraits in used to retrieve the trait configuration. + GetTraits() v1.Traits + // GetIntegrationName return the name of the Integration which has to be imported. + GetIntegrationName() string + // GetIntegrationNameSpace return the namespace of the Integration which has to be imported. + GetIntegrationNameSpace() string + // GetAppObj return the object from which we're importing. + GetAppObj() ctrl.Object +} + +// NonManagedCamelDeployment represents a regular Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelDeployment struct { + deploy *appsv1.Deployment +} + +// GetName returns the name of the Camel application. +func (app *NonManagedCamelDeployment) GetName() string { + return app.deploy.GetName() +} + +// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). +func (app *NonManagedCamelDeployment) GetKind() string { + return "Deployment" +} + +// GetTraits in used to retrieve the trait configuration. +func (app *NonManagedCamelDeployment) GetTraits() v1.Traits { + return v1.Traits{ + Container: &trait.ContainerTrait{ + Name: app.getContainerNameFromDeployment(), + }, + } +} + +// GetAppObj return the object from which we're importing. +func (app *NonManagedCamelDeployment) GetAppObj() ctrl.Object { + return app.deploy +} + +// GetIntegrationName return the name of the Integration which has to be imported. +func (app *NonManagedCamelDeployment) GetIntegrationName() string { + return app.deploy.Labels[v1.IntegrationLabel] +} + +// GetIntegrationNameSpace return the namespace of the Integration which has to be imported. +func (app *NonManagedCamelDeployment) GetIntegrationNameSpace() string { + return app.deploy.Namespace +} + +// getContainerNameFromDeployment returns the container name which is running the Camel application. +func (app *NonManagedCamelDeployment) getContainerNameFromDeployment() string { + firstContainerName := "" + for _, ct := range app.deploy.Spec.Template.Spec.Containers { + // set as fallback if no container is named as the deployment + if firstContainerName == "" { + firstContainerName = app.deploy.Name + } + if ct.Name == app.deploy.Name { + return app.deploy.Name + } + } + return firstContainerName +} + +// NonManagedCamelCronjob represents a cron Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelCronjob struct { + cron *batchv1.CronJob +} + +// GetName returns the name of the Camel application. +func (app *NonManagedCamelCronjob) GetName() string { + return app.cron.GetName() +} + +// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). +func (app *NonManagedCamelCronjob) GetKind() string { + return "CronJob" +} + +// GetTraits in used to retrieve the trait configuration. +func (app *NonManagedCamelCronjob) GetTraits() v1.Traits { + return v1.Traits{} +} + +// GetIntegrationName return the name of the Integration which has to be imported. +func (app *NonManagedCamelCronjob) GetIntegrationName() string { + return app.cron.Labels[v1.IntegrationLabel] +} + +// GetIntegrationNameSpace return the namespace of the Integration which has to be imported. +func (app *NonManagedCamelCronjob) GetIntegrationNameSpace() string { + return app.cron.Namespace +} + +// GetAppObj return the object from which we're importing. +func (app *NonManagedCamelCronjob) GetAppObj() ctrl.Object { + return app.cron +} + +// NonManagedCamelKnativeService represents a Knative Service based Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelKnativeService struct { + ksvc *servingv1.Service +} + +// GetName returns the name of the Camel application. +func (app *NonManagedCamelKnativeService) GetName() string { + return app.ksvc.GetName() +} + +// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). +func (app *NonManagedCamelKnativeService) GetKind() string { + return "KnativeService" +} + +// GetTraits in used to retrieve the trait configuration. +func (app *NonManagedCamelKnativeService) GetTraits() v1.Traits { + return v1.Traits{} +} + +// GetIntegrationName return the name of the Integration which has to be imported. +func (app *NonManagedCamelKnativeService) GetIntegrationName() string { + return app.ksvc.Labels[v1.IntegrationLabel] +} + +// GetIntegrationNameSpace return the namespace of the Integration which has to be imported. +func (app *NonManagedCamelKnativeService) GetIntegrationNameSpace() string { + return app.ksvc.Namespace +} + +// GetAppObj return the object from which we're importing. +func (app *NonManagedCamelKnativeService) GetAppObj() ctrl.Object { + return app.ksvc +} diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 9a6208fcb..5630f09c7 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -28,6 +28,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -43,6 +44,7 @@ import ( utilResource "github.com/apache/camel-k/v2/pkg/util/resource" ) +// NewMonitorAction is an action used to monitor manager Integrations. func NewMonitorAction() Action { return &monitorAction{} } @@ -58,7 +60,9 @@ func (action *monitorAction) Name() string { func (action *monitorAction) CanHandle(integration *v1.Integration) bool { return integration.Status.Phase == v1.IntegrationPhaseDeploying || integration.Status.Phase == v1.IntegrationPhaseRunning || - integration.Status.Phase == v1.IntegrationPhaseError + integration.Status.Phase == v1.IntegrationPhaseError || + integration.Status.Phase == v1.IntegrationPhaseImportMissing || + integration.Status.Phase == v1.IntegrationPhaseCannotMonitor } func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { @@ -124,16 +128,51 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return nil, err } + return action.monitorPods(ctx, environment, integration) +} + +func (action *monitorAction) monitorPods(ctx context.Context, environment *trait.Environment, integration *v1.Integration) (*v1.Integration, error) { + controller, err := action.newController(environment, integration) + if err != nil { + return nil, err + } + if controller.isEmptySelector() { + // This is happening when the Deployment, CronJob, etc resources + // have no selector or labels to identify sibling Pods. + integration.Status.Phase = v1.IntegrationPhaseCannotMonitor + integration.Status.SetConditions( + v1.IntegrationCondition{ + Type: v1.IntegrationConditionMonitoringPodsAvailable, + Status: corev1.ConditionFalse, + Reason: v1.IntegrationConditionMonitoringPodsAvailableReason, + Message: fmt.Sprintf("Could not find any selector for %s. Make sure to include any label in the template and the Pods generated to inherit such label for monitoring purposes.", controller.getControllerName()), + }, + ) + return integration, nil + } + + controllerSelector := controller.getSelector() + selector, err := metav1.LabelSelectorAsSelector(&controllerSelector) + if err != nil { + return nil, err + } + integration.Status.SetConditions( + v1.IntegrationCondition{ + Type: v1.IntegrationConditionMonitoringPodsAvailable, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionMonitoringPodsAvailableReason, + }, + ) // Enforce the scale sub-resource label selector. // It is used by the HPA that queries the scale sub-resource endpoint, // to list the pods owned by the integration. - integration.Status.Selector = v1.IntegrationLabel + "=" + integration.Name + integration.Status.Selector = selector.String() // Update the replicas count pendingPods := &corev1.PodList{} err = action.client.List(ctx, pendingPods, ctrl.InNamespace(integration.Namespace), - ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, + &ctrl.ListOptions{LabelSelector: selector}, ctrl.MatchingFields{"status.phase": string(corev1.PodPending)}) if err != nil { return nil, err @@ -141,7 +180,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra runningPods := &corev1.PodList{} err = action.client.List(ctx, runningPods, ctrl.InNamespace(integration.Namespace), - ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, + &ctrl.ListOptions{LabelSelector: selector}, ctrl.MatchingFields{"status.phase": string(corev1.PodRunning)}) if err != nil { return nil, err @@ -161,7 +200,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra integration.Status.Phase = v1.IntegrationPhaseRunning } if err = action.updateIntegrationPhaseAndReadyCondition( - ctx, environment, integration, pendingPods.Items, runningPods.Items, + ctx, controller, environment, integration, pendingPods.Items, runningPods.Items, ); err != nil { return nil, err } @@ -255,6 +294,9 @@ type controller interface { checkReadyCondition(ctx context.Context) (bool, error) getPodSpec() corev1.PodSpec updateReadyCondition(readyPods int) bool + getSelector() metav1.LabelSelector + isEmptySelector() bool + getControllerName() string } func (action *monitorAction) newController(env *trait.Environment, integration *v1.Integration) (controller, error) { @@ -311,7 +353,7 @@ func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object { } func (action *monitorAction) updateIntegrationPhaseAndReadyCondition( - ctx context.Context, environment *trait.Environment, integration *v1.Integration, + ctx context.Context, controller controller, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod, ) error { controller, err := action.newController(environment, integration) diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go index 1620a66c3..f5b9a6419 100644 --- a/pkg/controller/integration/monitor_cronjob.go +++ b/pkg/controller/integration/monitor_cronjob.go @@ -23,6 +23,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime/pkg/client" @@ -110,3 +111,18 @@ func (c *cronJobController) updateReadyCondition(readyPods int) bool { return false } + +func (c *cronJobController) getSelector() metav1.LabelSelector { + // We use all the labels which will be transferred to the Pod generated + return metav1.LabelSelector{ + MatchLabels: c.obj.Spec.JobTemplate.Spec.Template.Labels, + } +} + +func (c *cronJobController) isEmptySelector() bool { + return c.obj.Spec.JobTemplate.Spec.Template.Labels == nil +} + +func (c *cronJobController) getControllerName() string { + return fmt.Sprintf("CronJob/%s", c.obj.Name) +} diff --git a/pkg/controller/integration/monitor_deployment.go b/pkg/controller/integration/monitor_deployment.go index e2f823c16..e3325f8ea 100644 --- a/pkg/controller/integration/monitor_deployment.go +++ b/pkg/controller/integration/monitor_deployment.go @@ -23,6 +23,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util/kubernetes" @@ -91,3 +92,15 @@ func (c *deploymentController) updateReadyCondition(readyPods int) bool { return false } + +func (c *deploymentController) getSelector() metav1.LabelSelector { + return *c.obj.Spec.Selector +} + +func (c *deploymentController) isEmptySelector() bool { + return c.obj.Spec.Selector.MatchExpressions == nil && c.obj.Spec.Selector.MatchLabels == nil +} + +func (c *deploymentController) getControllerName() string { + return fmt.Sprintf("Deployment/%s", c.obj.Name) +} diff --git a/pkg/controller/integration/monitor_knative.go b/pkg/controller/integration/monitor_knative.go index 06b7dc82b..ed614f1a1 100644 --- a/pkg/controller/integration/monitor_knative.go +++ b/pkg/controller/integration/monitor_knative.go @@ -19,8 +19,10 @@ package integration import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" servingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -63,3 +65,18 @@ func (c *knativeServiceController) updateReadyCondition(readyPods int) bool { return false } + +func (c *knativeServiceController) getSelector() metav1.LabelSelector { + // We use all the labels which will be transferred to the Pod generated + return metav1.LabelSelector{ + MatchLabels: c.obj.Spec.Template.Labels, + } +} + +func (c *knativeServiceController) isEmptySelector() bool { + return c.obj.Spec.Template.Labels == nil +} + +func (c *knativeServiceController) getControllerName() string { + return fmt.Sprintf("KnativeService/%s", c.obj.Name) +} diff --git a/pkg/controller/integration/monitor_synthetic.go b/pkg/controller/integration/monitor_synthetic.go new file mode 100644 index 000000000..a10a03deb --- /dev/null +++ b/pkg/controller/integration/monitor_synthetic.go @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/trait" +) + +// NewMonitorSyntheticAction is an action used to monitor synthetic Integrations. +func NewMonitorSyntheticAction() Action { + return &monitorSyntheticAction{} +} + +type monitorSyntheticAction struct { + monitorAction +} + +func (action *monitorSyntheticAction) Name() string { + return "monitor-synthetic" +} + +func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { + environment, err := trait.NewSyntheticEnvironment(ctx, action.client, integration, nil) + if err != nil { + if k8serrors.IsNotFound(err) { + // Not an error: the resource from which we imported has been deleted, report in it status. + // It may be a temporary situation, for example, if the deployment from which the Integration is imported + // is being redeployed. For this reason we should keep the Integration instead of forcefully removing it. + message := fmt.Sprintf( + "import %s %s no longer available", + integration.Annotations[v1.IntegrationImportedKindLabel], + integration.Annotations[v1.IntegrationImportedNameLabel], + ) + action.L.Info(message) + integration.SetReadyConditionError(message) + zero := int32(0) + integration.Status.Phase = v1.IntegrationPhaseImportMissing + integration.Status.Replicas = &zero + return integration, nil + } + // report the error + integration.Status.Phase = v1.IntegrationPhaseError + integration.SetReadyCondition(corev1.ConditionFalse, v1.IntegrationConditionImportingKindAvailableReason, err.Error()) + return integration, err + } + + return action.monitorPods(ctx, environment, integration) +} diff --git a/pkg/controller/integration/predicate.go b/pkg/controller/integration/predicate.go index 79d61556a..0feb71fec 100644 --- a/pkg/controller/integration/predicate.go +++ b/pkg/controller/integration/predicate.go @@ -21,6 +21,7 @@ import ( "reflect" "k8s.io/apimachinery/pkg/api/equality" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -55,3 +56,39 @@ func (StatusChangedPredicate) Update(e event.UpdateEvent) bool { return !equality.Semantic.DeepDerivative(s1.Interface(), s2.Interface()) } + +// NonManagedObjectPredicate implements a generic update predicate function for managed object. +type NonManagedObjectPredicate struct { + predicate.Funcs +} + +// Create --. +func (NonManagedObjectPredicate) Create(e event.CreateEvent) bool { + return !isManagedObject(e.Object) +} + +// Update --. +func (NonManagedObjectPredicate) Update(e event.UpdateEvent) bool { + return !isManagedObject(e.ObjectNew) +} + +// Delete --. +func (NonManagedObjectPredicate) Delete(e event.DeleteEvent) bool { + return !isManagedObject(e.Object) +} + +// Generic --. +func (NonManagedObjectPredicate) Generic(e event.GenericEvent) bool { + return !isManagedObject(e.Object) +} + +// isManagedObject returns true if the object is managed by an Integration. +func isManagedObject(obj ctrl.Object) bool { + for _, mr := range obj.GetOwnerReferences() { + if mr.APIVersion == "camel.apache.org/v1" && + mr.Kind == "Integration" { + return true + } + } + return false +} diff --git a/pkg/trait/camel.go b/pkg/trait/camel.go index 2a4e7b3f4..71a24550e 100644 --- a/pkg/trait/camel.go +++ b/pkg/trait/camel.go @@ -64,7 +64,8 @@ func (t *camelTrait) Configure(e *Environment) (bool, *TraitCondition, error) { t.RuntimeVersion = determineRuntimeVersion(e) } - return true, nil, nil + // Don't run this trait for a synthetic Integration + return e.Integration == nil || !e.Integration.IsSynthetic(), nil, nil } func (t *camelTrait) Apply(e *Environment) error { diff --git a/pkg/trait/platform.go b/pkg/trait/platform.go index 58e545597..ec3fb1e04 100644 --- a/pkg/trait/platform.go +++ b/pkg/trait/platform.go @@ -73,7 +73,8 @@ func (t *platformTrait) Configure(e *Environment) (bool, *TraitCondition, error) } } - return true, nil, nil + // Don't run this trait for a synthetic Integration + return e.Integration == nil || !e.Integration.IsSynthetic(), nil, nil } func (t *platformTrait) Apply(e *Environment) error { diff --git a/pkg/trait/trait.go b/pkg/trait/trait.go index 33676616f..16794ee12 100644 --- a/pkg/trait/trait.go +++ b/pkg/trait/trait.go @@ -24,13 +24,15 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/platform" "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/log" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" + serving "knative.dev/serving/pkg/apis/serving/v1" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" ) func Apply(ctx context.Context, c client.Client, integration *v1.Integration, kit *v1.IntegrationKit) (*Environment, error) { @@ -97,7 +99,7 @@ func newEnvironment(ctx context.Context, c client.Client, integration *v1.Integr return nil, errors.New("neither integration nor kit are set") } - var obj k8sclient.Object + var obj ctrl.Object if integration != nil { obj = integration } else if kit != nil { @@ -134,3 +136,68 @@ func newEnvironment(ctx context.Context, c client.Client, integration *v1.Integr return &env, nil } + +// NewSyntheticEnvironment creates an environment suitable for a synthetic Integration. +func NewSyntheticEnvironment(ctx context.Context, c client.Client, integration *v1.Integration, kit *v1.IntegrationKit) (*Environment, error) { + if integration == nil && kit == nil { + return nil, errors.New("neither integration nor kit are set") + } + + env := Environment{ + Ctx: ctx, + Platform: nil, + Client: c, + IntegrationKit: kit, + Integration: integration, + ExecutedTraits: make([]Trait, 0), + Resources: kubernetes.NewCollection(), + EnvVars: make([]corev1.EnvVar, 0), + ApplicationProperties: make(map[string]string), + } + + catalog := NewCatalog(c) + // set the catalog + env.Catalog = catalog + // we need to simulate the execution of the traits to fill certain values used later by monitoring + _, err := catalog.apply(&env) + if err != nil { + return nil, fmt.Errorf("error during trait customization: %w", err) + } + camelApp, err := getCamelAppObject( + ctx, + c, + integration.Annotations[v1.IntegrationImportedKindLabel], + integration.Namespace, + integration.Annotations[v1.IntegrationImportedNameLabel], + ) + if err != nil { + return nil, err + } + env.Resources.Add(camelApp) + + return &env, nil +} + +func getCamelAppObject(ctx context.Context, c client.Client, kind, namespace, name string) (ctrl.Object, error) { + switch kind { + case "Deployment": + return c.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + case "CronJob": + return c.BatchV1().CronJobs(namespace).Get(ctx, name, metav1.GetOptions{}) + case "KnativeService": + ksvc := &serving.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: serving.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + err := c.Get(ctx, ctrl.ObjectKeyFromObject(ksvc), ksvc) + return ksvc, err + default: + return nil, fmt.Errorf("cannot create a synthetic environment for %s kind", kind) + } +}