This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push: new 3f32342 Add a -w flag to kamel install #135 3f32342 is described below commit 3f323423c8d5478f439d8698ef5e419d0ed09b69 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Jan 16 13:43:44 2019 +0100 Add a -w flag to kamel install #135 --- pkg/cmd/install.go | 34 +++++++++ pkg/cmd/run.go | 2 +- pkg/util/kubernetes/customclient/customclient.go | 18 +++++ pkg/util/watch/watch.go | 88 ++++++++++++++++++++---- 4 files changed, 128 insertions(+), 14 deletions(-) diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go index f9bbff2..29b7015 100644 --- a/pkg/cmd/install.go +++ b/pkg/cmd/install.go @@ -21,6 +21,9 @@ import ( "fmt" "strings" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/pkg/util/watch" + "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/install" "github.com/apache/camel-k/pkg/util/kubernetes" @@ -40,6 +43,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command { RunE: impl.install, } + cmd.Flags().BoolVarP(&impl.wait, "wait", "w", false, "Waits for the platform to be running") cmd.Flags().BoolVar(&impl.clusterSetupOnly, "cluster-setup", false, "Execute cluster-wide operations only (may require admin rights)") cmd.Flags().BoolVar(&impl.skipClusterSetup, "skip-cluster-setup", false, "Skip the cluster-setup phase") cmd.Flags().BoolVar(&impl.exampleSetup, "example", false, "Install example integration") @@ -66,6 +70,7 @@ func newCmdInstall(rootCmdOptions *RootCmdOptions) *cobra.Command { type installCmdOptions struct { *RootCmdOptions + wait bool clusterSetupOnly bool skipClusterSetup bool exampleSetup bool @@ -155,6 +160,13 @@ func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error { } if collection == nil { + if o.wait { + err = o.waitForPlatformReady(platform) + if err != nil { + return err + } + } + fmt.Println("Camel K installed in namespace", namespace) } } @@ -186,3 +198,25 @@ func (o *installCmdOptions) printOutput(collection *kubernetes.Collection) error } return nil } + +func (o *installCmdOptions) waitForPlatformReady(platform *v1alpha1.IntegrationPlatform) error { + handler := func(i *v1alpha1.IntegrationPlatform) bool { + if i.Status.Phase != "" { + fmt.Println("platform \""+platform.Name+"\" in phase", i.Status.Phase) + + if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady { + // TODO display some error info when available in the status + return false + } + + if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseError { + fmt.Println("platform installation failed") + return false + } + } + + return true + } + + return watch.HandlePlatformStateChanges(o.Context, platform, handler) +} diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 079016e..b4c34a2 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -227,7 +227,7 @@ func (o *runCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integratio return true } - return watch.HandleStateChanges(o.Context, integration, handler) + return watch.HandleIntegrationStateChanges(o.Context, integration, handler) } func (o *runCmdOptions) syncIntegration(c client.Client, sources []string) error { diff --git a/pkg/util/kubernetes/customclient/customclient.go b/pkg/util/kubernetes/customclient/customclient.go index 345be1d..3d41155 100644 --- a/pkg/util/kubernetes/customclient/customclient.go +++ b/pkg/util/kubernetes/customclient/customclient.go @@ -18,6 +18,7 @@ limitations under the License. package customclient import ( + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -65,3 +66,20 @@ func GetDynamicClientFor(group string, version string, kind string, namespace st Resource: kind, }).Namespace(namespace), nil } + +// GetDefaultDynamicClientFor returns a dynamic client for a given kind +func GetDefaultDynamicClientFor(kind string, namespace string) (dynamic.ResourceInterface, error) { + conf, err := config.GetConfig() + if err != nil { + return nil, err + } + dynamicClient, err := dynamic.NewForConfig(conf) + if err != nil { + return nil, err + } + return dynamicClient.Resource(schema.GroupVersionResource{ + Group: v1alpha1.SchemeGroupVersion.Group, + Version: v1alpha1.SchemeGroupVersion.Version, + Resource: kind, + }).Namespace(namespace), nil +} diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go index dcf35ae..f1bcd84 100644 --- a/pkg/util/watch/watch.go +++ b/pkg/util/watch/watch.go @@ -20,19 +20,20 @@ package watch import ( "context" + "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/util/kubernetes/customclient" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" ) // -// HandleStateChanges watches a integration resource and invoke the given handler when its status changes. +// HandleIntegrationStateChanges watches a integration resource and invoke the given handler when its status changes. // -// err := watch.HandleStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool { +// err := watch.HandleIntegrationStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool { // if i.Status.Phase == v1alpha1.IntegrationPhaseRunning { // return false // } @@ -42,8 +43,8 @@ import ( // // This function blocks until the handler function returns true or either the events channel or the context is closed. // -func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error { - dynamicClient, err := customclient.GetDynamicClientFor(v1alpha1.SchemeGroupVersion.Group, v1alpha1.SchemeGroupVersion.Version, "integrations", integration.Namespace) +func HandleIntegrationStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error { + dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrations", integration.Namespace) if err != nil { return err } @@ -70,23 +71,84 @@ func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, if e.Object != nil { if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { - unstr := unstructured.Unstructured{ - Object: runtimeUnstructured.UnstructuredContent(), + jsondata, err := kubernetes.ToJSON(runtimeUnstructured) + if err != nil { + return err + } + copy := integration.DeepCopy() + err = json.Unmarshal(jsondata, copy) + if err != nil { + logrus.Error("Unexpected error detected when watching resource", err) + return nil } - jsondata, err := unstr.MarshalJSON() + + if lastObservedState == nil || *lastObservedState != copy.Status.Phase { + lastObservedState = ©.Status.Phase + if !handler(copy) { + return nil + } + } + } + } + } + } +} + +// +// HandlePlatformStateChanges watches a platform resource and invoke the given handler when its status changes. +// +// err := watch.HandlePlatformStateChanges(ctx, platform, func(i *v1alpha1.IntegrationPlatform) bool { +// if i.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady { +// return false +// } +// +// return true +// }) +// +// This function blocks until the handler function returns true or either the events channel or the context is closed. +// +func HandlePlatformStateChanges(ctx context.Context, platform *v1alpha1.IntegrationPlatform, handler func(platform *v1alpha1.IntegrationPlatform) bool) error { + dynamicClient, err := customclient.GetDefaultDynamicClientFor("integrationplatforms", platform.Namespace) + if err != nil { + return err + } + watcher, err := dynamicClient.Watch(metav1.ListOptions{ + FieldSelector: "metadata.name=" + platform.Name, + }) + if err != nil { + return err + } + + defer watcher.Stop() + events := watcher.ResultChan() + + var lastObservedState *v1alpha1.IntegrationPlatformPhase + + for { + select { + case <-ctx.Done(): + return nil + case e, ok := <-events: + if !ok { + return nil + } + + if e.Object != nil { + if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { + jsondata, err := kubernetes.ToJSON(runtimeUnstructured) if err != nil { return err } - icopy := integration.DeepCopy() - err = json.Unmarshal(jsondata, icopy) + copy := platform.DeepCopy() + err = json.Unmarshal(jsondata, copy) if err != nil { logrus.Error("Unexpected error detected when watching resource", err) return nil } - if lastObservedState == nil || *lastObservedState != icopy.Status.Phase { - lastObservedState = &icopy.Status.Phase - if !handler(icopy) { + if lastObservedState == nil || *lastObservedState != copy.Status.Phase { + lastObservedState = ©.Status.Phase + if !handler(copy) { return nil } }