This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit adf3eadec6d5b38590643e274f08c3240ea250c6 Author: nferraro <ni.ferr...@gmail.com> AuthorDate: Mon Jan 7 14:37:39 2019 +0100 Fix #237: fix installation from scratch --- pkg/client/client.go | 23 ++++++++--------- pkg/cmd/install.go | 22 ++++++++-------- pkg/cmd/root.go | 9 +++++-- pkg/install/cluster.go | 67 +++++++++++++++++++++++++++++++++++++++++++------ pkg/install/operator.go | 20 --------------- test/testing_env.go | 10 +++++--- 6 files changed, 94 insertions(+), 57 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 7a0c73a..cade334 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" + clientscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest" @@ -49,6 +50,11 @@ type Injectable interface { InjectClient(Client) } +// Provider is used to provide a new instance of the Client each time it's required +type Provider struct { + Get func() (Client, error) +} + type defaultClient struct { controller.Client kubernetes.Interface @@ -68,30 +74,21 @@ func NewOutOfClusterClient(kubeconfig string) (Client, error) { return nil, err } - options := manager.Options{ - LeaderElection: false, - } - - // Create a new Cmd to provide shared dependencies and start components - mgr, err := manager.New(cfg, options) - if err != nil { - return nil, err - } + scheme := clientscheme.Scheme // Setup Scheme for all resources - if err := apis.AddToScheme(mgr.GetScheme()); err != nil { + if err := apis.AddToScheme(scheme); err != nil { return nil, err } var clientset kubernetes.Interface - if clientset, err = kubernetes.NewForConfig(mgr.GetConfig()); err != nil { + if clientset, err = kubernetes.NewForConfig(cfg); err != nil { return nil, err } // Create a new client to avoid using cache (enabled by default on operator-sdk client) clientOptions := controller.Options{ - Scheme: mgr.GetScheme(), - Mapper: mgr.GetRESTMapper(), + Scheme: scheme, } dynClient, err := controller.New(cfg, clientOptions) if err != nil { diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go index c097cb6..342c4a9 100644 --- a/pkg/cmd/install.go +++ b/pkg/cmd/install.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/install" "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/pkg/errors" @@ -68,22 +69,16 @@ type installCmdOptions struct { } func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error { - // TODO verify if this is needed by running a installation from scratch - // Let's use a fast refresh period when running with the CLI - // k8sclient.ResetCacheEvery(8 * time.Second) - - c, err := o.GetCmdClient() - if err != nil { - return err - } - var collection *kubernetes.Collection if o.outputFormat != "" { collection = kubernetes.NewCollection() } if !o.skipClusterSetup { - err := install.SetupClusterwideResourcesOrCollect(o.Context, c, collection) + // Let's use a client provider during cluster installation, to eliminate the problem of CRD object caching + clientProvider := client.Provider{Get: o.NewCmdClient} + + err := install.SetupClusterwideResourcesOrCollect(o.Context, clientProvider, collection) if err != nil && k8serrors.IsForbidden(err) { fmt.Println("Current user is not authorized to create cluster-wide objects like custom resource definitions or cluster roles: ", err) @@ -99,9 +94,14 @@ func (o *installCmdOptions) install(cmd *cobra.Command, args []string) error { fmt.Println("Camel K cluster setup completed successfully") } } else { + c, err := o.GetCmdClient() + if err != nil { + return err + } + namespace := o.Namespace - err := install.OperatorOrCollect(o.Context, c, namespace, collection) + err = install.OperatorOrCollect(o.Context, c, namespace, collection) if err != nil { return err } diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index c718d94..bf6aeef 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -81,13 +81,18 @@ func (command *RootCmdOptions) preRun(cmd *cobra.Command, args []string) error { return nil } -// GetCmdClient returns a client that can be used from command line tools +// GetCmdClient returns the client that can be used from command line tools func (command *RootCmdOptions) GetCmdClient() (client.Client, error) { // Get the pre-computed client if command._client != nil { return command._client, nil } var err error - command._client, err = client.NewOutOfClusterClient(command.KubeConfig) + command._client, err = command.NewCmdClient() return command._client, err } + +// NewCmdClient returns a new client that can be used from command line tools +func (command *RootCmdOptions) NewCmdClient() (client.Client, error) { + return client.NewOutOfClusterClient(command.KubeConfig) +} diff --git a/pkg/install/cluster.go b/pkg/install/cluster.go index 7adf008..31d1096 100644 --- a/pkg/install/cluster.go +++ b/pkg/install/cluster.go @@ -19,25 +19,33 @@ package install import ( "context" + "errors" + "strconv" + "time" "github.com/apache/camel-k/deploy" "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/kubernetes/customclient" "k8s.io/api/rbac/v1" - "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/yaml" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) // SetupClusterwideResources -- -func SetupClusterwideResources(ctx context.Context, c client.Client) error { - return SetupClusterwideResourcesOrCollect(ctx, c, nil) +func SetupClusterwideResources(ctx context.Context, clientProvider client.Provider) error { + return SetupClusterwideResourcesOrCollect(ctx, clientProvider, nil) } // SetupClusterwideResourcesOrCollect -- -func SetupClusterwideResourcesOrCollect(ctx context.Context, c client.Client, collection *kubernetes.Collection) error { +func SetupClusterwideResourcesOrCollect(ctx context.Context, clientProvider client.Provider, collection *kubernetes.Collection) error { + // Get a client to install the CRD + c, err := clientProvider.Get() + if err != nil { + return err + } // Install CRD for Integration Platform (if needed) if err := installCRD(ctx, c, "IntegrationPlatform", "crd-integration-platform.yaml", collection); err != nil { @@ -66,13 +74,56 @@ func SetupClusterwideResourcesOrCollect(ctx context.Context, c client.Client, co } } + // Wait for all CRDs to be installed before proceeding + if err := WaitForAllCRDInstallation(ctx, clientProvider, 25*time.Second); err != nil { + return err + } + return nil } -// IsCRDInstalled check if the given CRT kind is installed +// WaitForAllCRDInstallation waits until all CRDs are installed +func WaitForAllCRDInstallation(ctx context.Context, clientProvider client.Provider, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for { + var c client.Client + var err error + if c, err = clientProvider.Get(); err != nil { + return err + } + var inst bool + if inst, err = AreAllCRDInstalled(ctx, c); err != nil { + return err + } else if inst { + return nil + } + // Check after 2 seconds if not expired + if time.Now().After(deadline) { + return errors.New("cannot check CRD installation after " + strconv.FormatInt(timeout.Nanoseconds()/1000000000, 10) + " seconds") + } + time.Sleep(2 * time.Second) + } +} + +// AreAllCRDInstalled check if all the required CRDs are installed +func AreAllCRDInstalled(ctx context.Context, c client.Client) (bool, error) { + if ok, err := IsCRDInstalled(ctx, c, "IntegrationPlatform"); err != nil { + return ok, err + } else if !ok { + return false, nil + } + if ok, err := IsCRDInstalled(ctx, c, "IntegrationContext"); err != nil { + return ok, err + } else if !ok { + return false, nil + } + return IsCRDInstalled(ctx, c, "Integration") +} + +// IsCRDInstalled check if the given CRD kind is installed func IsCRDInstalled(ctx context.Context, c client.Client, kind string) (bool, error) { lst, err := c.Discovery().ServerResourcesForGroupVersion("camel.apache.org/v1alpha1") - if err != nil && errors.IsNotFound(err) { + if err != nil && k8serrors.IsNotFound(err) { return false, nil } else if err != nil { return false, err @@ -120,7 +171,7 @@ func installCRD(ctx context.Context, c client.Client, kind string, resourceName Resource("customresourcedefinitions"). Do() // Check result - if result.Error() != nil && !errors.IsAlreadyExists(result.Error()) { + if result.Error() != nil && !k8serrors.IsAlreadyExists(result.Error()) { return result.Error() } @@ -143,7 +194,7 @@ func IsClusterRoleInstalled(ctx context.Context, c client.Client) (bool, error) return false, err } err = c.Get(ctx, key, &clusterRole) - if err != nil && errors.IsNotFound(err) { + if err != nil && k8serrors.IsNotFound(err) { return false, nil } else if err != nil { return false, err diff --git a/pkg/install/operator.go b/pkg/install/operator.go index 1c7f871..365592f 100644 --- a/pkg/install/operator.go +++ b/pkg/install/operator.go @@ -20,8 +20,6 @@ package install import ( "context" "errors" - "strconv" - "time" "github.com/apache/camel-k/deploy" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -30,7 +28,6 @@ import ( "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/minishift" "github.com/apache/camel-k/pkg/util/openshift" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) // Operator installs the operator resources in the given namespace @@ -100,9 +97,6 @@ func Platform(ctx context.Context, c client.Client, namespace string, registry s // PlatformOrCollect -- // nolint: lll func PlatformOrCollect(ctx context.Context, c client.Client, namespace string, registry string, organization string, pushSecret string, collection *kubernetes.Collection) (*v1alpha1.IntegrationPlatform, error) { - if err := waitForPlatformCRDAvailable(ctx, c, namespace, 25*time.Second); err != nil { - return nil, err - } isOpenshift, err := openshift.IsOpenShift(c) if err != nil { return nil, err @@ -143,20 +137,6 @@ func PlatformOrCollect(ctx context.Context, c client.Client, namespace string, r return pl, nil } -func waitForPlatformCRDAvailable(ctx context.Context, c client.Client, namespace string, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for { - pla := v1alpha1.NewIntegrationPlatformList() - if err := c.List(ctx, &k8sclient.ListOptions{Namespace: namespace}, &pla); err == nil { - return nil - } - if time.Now().After(deadline) { - return errors.New("cannot list integration platforms after " + strconv.FormatInt(timeout.Nanoseconds()/1000000000, 10) + " seconds") - } - time.Sleep(2 * time.Second) - } -} - // Example -- func Example(ctx context.Context, c client.Client, namespace string) error { return ExampleOrCollect(ctx, c, namespace, nil) diff --git a/test/testing_env.go b/test/testing_env.go index 3d393ed..dbeae00 100644 --- a/test/testing_env.go +++ b/test/testing_env.go @@ -38,15 +38,19 @@ import ( var testContext context.Context var testClient client.Client +func newTestClient() (client.Client, error) { + return client.NewOutOfClusterClient("") +} + func init() { - testContext = context.TODO() var err error - testClient, err = client.NewOutOfClusterClient("") + err = install.SetupClusterwideResources(testContext, client.Provider{Get: newTestClient}) if err != nil { panic(err) } - err = install.SetupClusterwideResources(testContext, testClient) + testContext = context.TODO() + testClient, err = newTestClient() if err != nil { panic(err) }