This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 9f03af22c76b29e785833959765fe965a738b5f5
Author: Nicola Ferraro <ni.ferr...@gmail.com>
AuthorDate: Tue Jan 28 00:46:47 2020 +0100

    #1199: logging status information in dev and wait mode
---
 deploy/operator-role-kubernetes.yaml               |   1 +
 deploy/operator-role-olm.yaml                      |   1 +
 deploy/operator-role-openshift.yaml                |   1 +
 e2e/dev_mode_test.go                               |   5 +-
 pkg/cmd/install.go                                 |  23 ++--
 pkg/cmd/run.go                                     |  24 ++--
 pkg/controller/build/build_controller.go           |   8 +-
 .../integration/integration_controller.go          |   8 +-
 .../integrationkit/integrationkit_controller.go    |   8 +-
 .../integrationplatform_controller.go              |   8 +-
 pkg/{events => event}/manager.go                   |   2 +-
 pkg/trait/deployment_test.go                       |   2 +-
 pkg/util/watch/watch.go                            | 132 ++++++++++++++++++++-
 13 files changed, 172 insertions(+), 51 deletions(-)

diff --git a/deploy/operator-role-kubernetes.yaml 
b/deploy/operator-role-kubernetes.yaml
index 4e9db4c..bca4878 100644
--- a/deploy/operator-role-kubernetes.yaml
+++ b/deploy/operator-role-kubernetes.yaml
@@ -66,6 +66,7 @@ rules:
   resources:
   - events
   verbs:
+  - create
   - get
   - list
   - watch
diff --git a/deploy/operator-role-olm.yaml b/deploy/operator-role-olm.yaml
index a213197..18a6f91 100644
--- a/deploy/operator-role-olm.yaml
+++ b/deploy/operator-role-olm.yaml
@@ -66,6 +66,7 @@ rules:
   resources:
   - events
   verbs:
+  - create
   - get
   - list
   - watch
diff --git a/deploy/operator-role-openshift.yaml 
b/deploy/operator-role-openshift.yaml
index 9da5132..46ea8c5 100644
--- a/deploy/operator-role-openshift.yaml
+++ b/deploy/operator-role-openshift.yaml
@@ -66,6 +66,7 @@ rules:
   resources:
   - events
   verbs:
+  - create
   - get
   - list
   - watch
diff --git a/e2e/dev_mode_test.go b/e2e/dev_mode_test.go
index fd057e9..a028917 100644
--- a/e2e/dev_mode_test.go
+++ b/e2e/dev_mode_test.go
@@ -48,11 +48,12 @@ func TestRunDevMode(t *testing.T) {
                        kamelRun := kamelWithContext(ctx, "run", "-n", ns, 
file, "--dev")
                        kamelRun.SetOut(pipew)
 
-                       logScanner := util.NewLogScanner(ctx, piper, 
"Magicstring!", "Magicjordan!")
+                       logScanner := util.NewLogScanner(ctx, piper, 
"Integration yaml in phase Running", "Magicstring!", "Magicjordan!")
 
                        go kamelRun.Execute()
 
-                       Eventually(logScanner.IsFound("Magicstring!"), 
5*time.Minute).Should(BeTrue())
+                       Eventually(logScanner.IsFound("Integration yaml in 
phase Running"), 5*time.Minute).Should(BeTrue())
+                       Eventually(logScanner.IsFound("Magicstring!"), 
3*time.Minute).Should(BeTrue())
                        
Expect(logScanner.IsFound("Magicjordan!")()).To(BeFalse())
 
                        util.ReplaceInFile(t, file, "string!", "jordan!")
diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go
index b0441dc..248d7d7 100644
--- a/pkg/cmd/install.go
+++ b/pkg/cmd/install.go
@@ -366,7 +366,7 @@ func (o *installCmdOptions) install(cobraCmd 
*cobra.Command, _ []string) error {
 
                if collection == nil {
                        if o.Wait {
-                               err = o.waitForPlatformReady(platform)
+                               err = o.waitForPlatformReady(cobraCmd, platform)
                                if err != nil {
                                        return err
                                }
@@ -418,25 +418,20 @@ func (o *installCmdOptions) printOutput(collection 
*kubernetes.Collection) error
        return nil
 }
 
-func (o *installCmdOptions) waitForPlatformReady(platform 
*v1.IntegrationPlatform) error {
+func (o *installCmdOptions) waitForPlatformReady(cmd *cobra.Command, platform 
*v1.IntegrationPlatform) error {
        handler := func(i *v1.IntegrationPlatform) bool {
-               if i.Status.Phase != "" {
-                       fmt.Println("platform \""+platform.Name+"\" in phase", 
i.Status.Phase)
-
-                       if i.Status.Phase == v1.IntegrationPlatformPhaseReady {
-                               // TODO display some error info when available 
in the status
-                               return false
-                       }
-
-                       if i.Status.Phase == v1.IntegrationPlatformPhaseError {
-                               fmt.Println("platform installation failed")
-                               return false
-                       }
+               if i.Status.Phase == v1.IntegrationPlatformPhaseReady || 
i.Status.Phase == v1.IntegrationPlatformPhaseError {
+                       return false
                }
 
                return true
        }
 
+       go watch.HandleIntegrationPlatformEvents(o.Context, platform, 
func(event *corev1.Event) bool {
+               fmt.Fprintln(cmd.OutOrStdout(), event.Message)
+               return true
+       })
+
        return watch.HandlePlatformStateChanges(o.Context, platform, handler)
 }
 
diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go
index b529955..bb7b6f3 100644
--- a/pkg/cmd/run.go
+++ b/pkg/cmd/run.go
@@ -43,6 +43,7 @@ import (
        "github.com/magiconair/properties"
        "github.com/pkg/errors"
        "github.com/spf13/cobra"
+       corev1 "k8s.io/api/core/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -245,7 +246,7 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
        }
        if o.Wait || o.Dev {
                for {
-                       integrationPhase, err := 
o.waitForIntegrationReady(integration)
+                       integrationPhase, err := o.waitForIntegrationReady(cmd, 
integration)
                        if err != nil {
                                return err
                        }
@@ -253,7 +254,6 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
                        if integrationPhase == nil || *integrationPhase == 
v1.IntegrationPhaseError {
                                return fmt.Errorf("integration \"%s\" 
deployment failed", integration.Name)
                        } else if *integrationPhase == 
v1.IntegrationPhaseRunning {
-                               fmt.Println("Running")
                                break
                        }
 
@@ -297,27 +297,23 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
        return nil
 }
 
-func (o *runCmdOptions) waitForIntegrationReady(integration *v1.Integration) 
(*v1.IntegrationPhase, error) {
+func (o *runCmdOptions) waitForIntegrationReady(cmd *cobra.Command, 
integration *v1.Integration) (*v1.IntegrationPhase, error) {
        handler := func(i *v1.Integration) bool {
                //
                // TODO when we add health checks, we should Wait until they 
are passed
                //
-               if i.Status.Phase != "" {
-                       fmt.Println("integration \""+integration.Name+"\" in 
phase", i.Status.Phase)
-
-                       if i.Status.Phase == v1.IntegrationPhaseRunning {
-                               // TODO display some error info when available 
in the status
-                               return false
-                       }
-
-                       if i.Status.Phase == v1.IntegrationPhaseError {
-                               return false
-                       }
+               if i.Status.Phase == v1.IntegrationPhaseRunning || 
i.Status.Phase == v1.IntegrationPhaseError {
+                       return false
                }
 
                return true
        }
 
+       go watch.HandleIntegrationEvents(o.Context, integration, func(event 
*corev1.Event) bool {
+               fmt.Fprintln(cmd.OutOrStdout(), event.Message)
+               return true
+       })
+
        return watch.HandleIntegrationStateChanges(o.Context, integration, 
handler)
 }
 
diff --git a/pkg/controller/build/build_controller.go 
b/pkg/controller/build/build_controller.go
index ecb2995..092dc68 100644
--- a/pkg/controller/build/build_controller.go
+++ b/pkg/controller/build/build_controller.go
@@ -22,7 +22,7 @@ import (
        "sync"
        "time"
 
-       "github.com/apache/camel-k/pkg/events"
+       camelevent "github.com/apache/camel-k/pkg/event"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
@@ -208,13 +208,13 @@ func (r *ReconcileBuild) Reconcile(request 
reconcile.Request) (reconcile.Result,
 
                        newTarget, err := a.Handle(ctx, target)
                        if err != nil {
-                               events.NotifyBuildError(ctx, r.client, 
r.recorder, &instance, newTarget, err)
+                               camelevent.NotifyBuildError(ctx, r.client, 
r.recorder, &instance, newTarget, err)
                                return reconcile.Result{}, err
                        }
 
                        if newTarget != nil {
                                if res, err := r.update(ctx, &instance, 
newTarget); err != nil {
-                                       events.NotifyBuildError(ctx, r.client, 
r.recorder, &instance, newTarget, err)
+                                       camelevent.NotifyBuildError(ctx, 
r.client, r.recorder, &instance, newTarget, err)
                                        return res, err
                                }
 
@@ -231,7 +231,7 @@ func (r *ReconcileBuild) Reconcile(request 
reconcile.Request) (reconcile.Result,
 
                        // handle one action at time so the resource
                        // is always at its latest state
-                       events.NotifyBuildUpdated(ctx, r.client, r.recorder, 
&instance, newTarget)
+                       camelevent.NotifyBuildUpdated(ctx, r.client, 
r.recorder, &instance, newTarget)
                        break
                }
        }
diff --git a/pkg/controller/integration/integration_controller.go 
b/pkg/controller/integration/integration_controller.go
index 205004b..1532c45 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -20,7 +20,7 @@ package integration
 import (
        "context"
 
-       "github.com/apache/camel-k/pkg/events"
+       camelevent "github.com/apache/camel-k/pkg/event"
        appsv1 "k8s.io/api/apps/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
@@ -260,13 +260,13 @@ func (r *ReconcileIntegration) Reconcile(request 
reconcile.Request) (reconcile.R
 
                        newTarget, err := a.Handle(ctx, target)
                        if err != nil {
-                               events.NotifyIntegrationError(ctx, r.client, 
r.recorder, &instance, newTarget, err)
+                               camelevent.NotifyIntegrationError(ctx, 
r.client, r.recorder, &instance, newTarget, err)
                                return reconcile.Result{}, err
                        }
 
                        if newTarget != nil {
                                if res, err := r.update(ctx, &instance, 
newTarget); err != nil {
-                                       events.NotifyIntegrationError(ctx, 
r.client, r.recorder, &instance, newTarget, err)
+                                       camelevent.NotifyIntegrationError(ctx, 
r.client, r.recorder, &instance, newTarget, err)
                                        return res, err
                                }
 
@@ -281,7 +281,7 @@ func (r *ReconcileIntegration) Reconcile(request 
reconcile.Request) (reconcile.R
 
                        // handle one action at time so the resource
                        // is always at its latest state
-                       events.NotifyIntegrationUpdated(ctx, r.client, 
r.recorder, &instance, newTarget)
+                       camelevent.NotifyIntegrationUpdated(ctx, r.client, 
r.recorder, &instance, newTarget)
                        break
                }
        }
diff --git a/pkg/controller/integrationkit/integrationkit_controller.go 
b/pkg/controller/integrationkit/integrationkit_controller.go
index c86947a..dd4b271 100644
--- a/pkg/controller/integrationkit/integrationkit_controller.go
+++ b/pkg/controller/integrationkit/integrationkit_controller.go
@@ -20,7 +20,7 @@ package integrationkit
 import (
        "context"
 
-       "github.com/apache/camel-k/pkg/events"
+       camelevent "github.com/apache/camel-k/pkg/event"
        "github.com/apache/camel-k/pkg/platform"
        "k8s.io/client-go/tools/record"
 
@@ -227,13 +227,13 @@ func (r *ReconcileIntegrationKit) Reconcile(request 
reconcile.Request) (reconcil
 
                        newTarget, err := a.Handle(ctx, target)
                        if err != nil {
-                               events.NotifyIntegrationKitError(ctx, r.client, 
r.recorder, &instance, newTarget, err)
+                               camelevent.NotifyIntegrationKitError(ctx, 
r.client, r.recorder, &instance, newTarget, err)
                                return reconcile.Result{}, err
                        }
 
                        if newTarget != nil {
                                if res, err := r.update(ctx, &instance, 
newTarget); err != nil {
-                                       events.NotifyIntegrationKitError(ctx, 
r.client, r.recorder, &instance, newTarget, err)
+                                       
camelevent.NotifyIntegrationKitError(ctx, r.client, r.recorder, &instance, 
newTarget, err)
                                        return res, err
                                }
 
@@ -248,7 +248,7 @@ func (r *ReconcileIntegrationKit) Reconcile(request 
reconcile.Request) (reconcil
 
                        // handle one action at time so the resource
                        // is always at its latest state
-                       events.NotifyIntegrationKitUpdated(ctx, r.client, 
r.recorder, &instance, newTarget)
+                       camelevent.NotifyIntegrationKitUpdated(ctx, r.client, 
r.recorder, &instance, newTarget)
                        break
                }
        }
diff --git 
a/pkg/controller/integrationplatform/integrationplatform_controller.go 
b/pkg/controller/integrationplatform/integrationplatform_controller.go
index 1ad9a60..0f8668a 100644
--- a/pkg/controller/integrationplatform/integrationplatform_controller.go
+++ b/pkg/controller/integrationplatform/integrationplatform_controller.go
@@ -21,7 +21,7 @@ import (
        "context"
        "time"
 
-       "github.com/apache/camel-k/pkg/events"
+       camelevent "github.com/apache/camel-k/pkg/event"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/client-go/tools/record"
@@ -151,13 +151,13 @@ func (r *ReconcileIntegrationPlatform) Reconcile(request 
reconcile.Request) (rec
 
                        target, err = a.Handle(ctx, target)
                        if err != nil {
-                               events.NotifyIntegrationPlatformError(ctx, 
r.client, r.recorder, &instance, target, err)
+                               camelevent.NotifyIntegrationPlatformError(ctx, 
r.client, r.recorder, &instance, target, err)
                                return reconcile.Result{}, err
                        }
 
                        if target != nil {
                                if err := r.client.Status().Patch(ctx, target, 
k8sclient.MergeFrom(&instance)); err != nil {
-                                       
events.NotifyIntegrationPlatformError(ctx, r.client, r.recorder, &instance, 
target, err)
+                                       
camelevent.NotifyIntegrationPlatformError(ctx, r.client, r.recorder, &instance, 
target, err)
                                        return reconcile.Result{}, err
                                }
 
@@ -174,7 +174,7 @@ func (r *ReconcileIntegrationPlatform) Reconcile(request 
reconcile.Request) (rec
 
                        // handle one action at time so the resource
                        // is always at its latest state
-                       events.NotifyIntegrationPlatformUpdated(ctx, r.client, 
r.recorder, &instance, target)
+                       camelevent.NotifyIntegrationPlatformUpdated(ctx, 
r.client, r.recorder, &instance, target)
                        break
                }
        }
diff --git a/pkg/events/manager.go b/pkg/event/manager.go
similarity index 99%
rename from pkg/events/manager.go
rename to pkg/event/manager.go
index 5622d27..9e4869a 100644
--- a/pkg/events/manager.go
+++ b/pkg/event/manager.go
@@ -15,7 +15,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package events
+package event
 
 import (
        "context"
diff --git a/pkg/trait/deployment_test.go b/pkg/trait/deployment_test.go
index 7d9c666..8a6a785 100644
--- a/pkg/trait/deployment_test.go
+++ b/pkg/trait/deployment_test.go
@@ -119,7 +119,7 @@ func 
TestApplyDeploymentTraitWhileDeployingIntegrationDoesSucceed(t *testing.T)
        conditions := environment.Integration.Status.Conditions
        assert.Len(t, conditions, 1)
        assert.Equal(t, v1.IntegrationConditionDeploymentAvailable, 
conditions[0].Type)
-       assert.Equal(t, "integration-name", conditions[0].Message)
+       assert.Equal(t, "deployment name is integration-name", 
conditions[0].Message)
 }
 
 func TestApplyDeploymentTraitWhileRunningIntegrationDoesSucceed(t *testing.T) {
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
index baaaa7c..289fc12 100644
--- a/pkg/util/watch/watch.go
+++ b/pkg/util/watch/watch.go
@@ -19,12 +19,13 @@ package watch
 
 import (
        "context"
-
-       "github.com/apache/camel-k/pkg/util/kubernetes"
-       "github.com/apache/camel-k/pkg/util/log"
+       "fmt"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
        "github.com/apache/camel-k/pkg/util/kubernetes/customclient"
+       "github.com/apache/camel-k/pkg/util/log"
+       corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/util/json"
@@ -97,6 +98,69 @@ func HandleIntegrationStateChanges(ctx context.Context, 
integration *v1.Integrat
 }
 
 //
+// HandleIntegrationEvents watches all events related to the given integration.
+//
+//     watch.HandleIntegrationEvents(o.Context, integration, func(event 
*corev1.Event) bool {
+//              println(event.Message)
+//              return true
+//        })
+//
+// This function blocks until the handler function returns true or either the 
events channel or the context is closed.
+//
+func HandleIntegrationEvents(ctx context.Context, integration *v1.Integration,
+       handler func(event *corev1.Event) bool) error {
+       dynamicClient, err := customclient.GetDynamicClientFor("", "v1", 
"events", integration.Namespace)
+       if err != nil {
+               return err
+       }
+       watcher, err := dynamicClient.Watch(metav1.ListOptions{
+               FieldSelector: fmt.Sprintf("involvedObject.kind=Integration,"+
+                       "involvedObject.apiVersion=%s,"+
+                       "involvedObject.name=%s",
+                       v1.SchemeGroupVersion.String(), integration.Name),
+       })
+       if err != nil {
+               return err
+       }
+
+       defer watcher.Stop()
+       events := watcher.ResultChan()
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return nil
+               case e, ok := <-events:
+                       if !ok {
+                               return nil
+                       }
+
+                       if e.Object != nil {
+                               if runtimeUnstructured, ok := 
e.Object.(runtime.Unstructured); ok {
+                                       jsondata, err := 
kubernetes.ToJSON(runtimeUnstructured)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       evt := corev1.Event{}
+                                       err = json.Unmarshal(jsondata, &evt)
+                                       if err != nil {
+                                               log.Error(err, "Unexpected 
error detected when watching resource")
+                                               return nil
+                                       }
+
+                                       if evt.CreationTimestamp.UnixNano() >= 
integration.CreationTimestamp.UnixNano() {
+                                               if !handler(&evt) {
+                                                       return nil
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+}
+
+
+//
 // HandlePlatformStateChanges watches a platform resource and invoke the given 
handler when its status changes.
 //
 //     err := watch.HandlePlatformStateChanges(ctx, platform, func(i 
*v1.IntegrationPlatform) bool {
@@ -159,3 +223,65 @@ func HandlePlatformStateChanges(ctx context.Context, 
platform *v1.IntegrationPla
                }
        }
 }
+
+//
+// HandleIntegrationPlatformEvents watches all events related to the given 
integration platform.
+//
+//     watch.HandleIntegrationPlatformEvents(o.Context, platform, func(event 
*corev1.Event) bool {
+//              println(event.Message)
+//              return true
+//        })
+//
+// This function blocks until the handler function returns true or either the 
events channel or the context is closed.
+//
+func HandleIntegrationPlatformEvents(ctx context.Context, p 
*v1.IntegrationPlatform,
+       handler func(event *corev1.Event) bool) error {
+       dynamicClient, err := customclient.GetDynamicClientFor("", "v1", 
"events", p.Namespace)
+       if err != nil {
+               return err
+       }
+       watcher, err := dynamicClient.Watch(metav1.ListOptions{
+               FieldSelector: 
fmt.Sprintf("involvedObject.kind=IntegrationPlatform,"+
+                       "involvedObject.apiVersion=%s,"+
+                       "involvedObject.name=%s",
+                       v1.SchemeGroupVersion.String(), p.Name),
+       })
+       if err != nil {
+               return err
+       }
+
+       defer watcher.Stop()
+       events := watcher.ResultChan()
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return nil
+               case e, ok := <-events:
+                       if !ok {
+                               return nil
+                       }
+
+                       if e.Object != nil {
+                               if runtimeUnstructured, ok := 
e.Object.(runtime.Unstructured); ok {
+                                       jsondata, err := 
kubernetes.ToJSON(runtimeUnstructured)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       evt := corev1.Event{}
+                                       err = json.Unmarshal(jsondata, &evt)
+                                       if err != nil {
+                                               log.Error(err, "Unexpected 
error detected when watching resource")
+                                               return nil
+                                       }
+
+                                       if evt.CreationTimestamp.UnixNano() >= 
p.CreationTimestamp.UnixNano() {
+                                               if !handler(&evt) {
+                                                       return nil
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+}

Reply via email to