This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push: new 92a8652 builder: add hooks to the builder state changes to update resources when the build completes (#343) 92a8652 is described below commit 92a865295a1d2b96474066483770b45248a5c32c Author: Luca Burgazzoli <lburgazz...@users.noreply.github.com> AuthorDate: Mon Jan 21 09:17:49 2019 +0100 builder: add hooks to the builder state changes to update resources when the build completes (#343) --- .../camel/v1alpha1/integration_types_support.go | 14 +++ pkg/builder/builder.go | 60 ++++++++---- pkg/builder/builder_steps.go | 76 -------------- pkg/builder/builder_types.go | 5 +- pkg/builder/kaniko/kaniko.go | 1 - pkg/builder/s2i/s2i.go | 1 - pkg/controller/integration/build_image.go | 109 ++++++++++++--------- pkg/controller/integrationcontext/build.go | 89 ++++++++++------- pkg/trait/builder.go | 2 - pkg/trait/builder_test.go | 4 +- pkg/util/kubernetes/util.go | 40 +++++++- test/build_manager_integration_test.go | 55 +++++++---- 12 files changed, 251 insertions(+), 205 deletions(-) diff --git a/pkg/apis/camel/v1alpha1/integration_types_support.go b/pkg/apis/camel/v1alpha1/integration_types_support.go index 2d9bff0..553a49c 100644 --- a/pkg/apis/camel/v1alpha1/integration_types_support.go +++ b/pkg/apis/camel/v1alpha1/integration_types_support.go @@ -25,6 +25,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// NewIntegration -- +func NewIntegration(namespace string, name string) Integration { + return Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: SchemeGroupVersion.String(), + Kind: IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } +} + // NewIntegrationList -- func NewIntegrationList() IntegrationList { return IntegrationList{ diff --git a/pkg/builder/builder.go b/pkg/builder/builder.go index 8cd03b1..9cd2bc6 100644 --- a/pkg/builder/builder.go +++ b/pkg/builder/builder.go @@ -27,6 +27,8 @@ import ( "sync/atomic" "time" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" "github.com/sirupsen/logrus" @@ -38,11 +40,16 @@ import ( // // ******************************** +type buildTask struct { + handler func(Result) + request Request +} + type defaultBuilder struct { log *logrus.Entry ctx context.Context client client.Client - requests chan Request + tasks chan buildTask interrupt chan bool request sync.Map running int32 @@ -55,7 +62,7 @@ func New(ctx context.Context, c client.Client, namespace string) Builder { log: logrus.WithField("logger", "builder"), ctx: ctx, client: c, - requests: make(chan Request), + tasks: make(chan buildTask), interrupt: make(chan bool, 1), running: 0, namespace: namespace, @@ -64,8 +71,14 @@ func New(ctx context.Context, c client.Client, namespace string) Builder { return &m } +func (b *defaultBuilder) IsBuilding(object v1.ObjectMeta) bool { + _, ok := b.request.Load(object.Name) + + return ok +} + // Submit -- -func (b *defaultBuilder) Submit(request Request) Result { +func (b *defaultBuilder) Submit(request Request, handler func(Result)) { if atomic.CompareAndSwapInt32(&b.running, 0, 1) { go b.loop() } @@ -73,6 +86,7 @@ func (b *defaultBuilder) Submit(request Request) Result { result, present := b.request.Load(request.Meta.Name) if !present || result == nil { result = Result{ + Builder: b, Request: request, Status: StatusSubmitted, } @@ -80,15 +94,8 @@ func (b *defaultBuilder) Submit(request Request) Result { b.log.Infof("submitting request: %+v", request) b.request.Store(request.Meta.Name, result) - b.requests <- request + b.tasks <- buildTask{handler: handler, request: request} } - - return result.(Result) -} - -// Purge -- -func (b *defaultBuilder) Purge(request Request) { - b.request.Delete(request.Meta.Name) } // ******************************** @@ -104,19 +111,19 @@ func (b *defaultBuilder) loop() { b.interrupt <- true close(b.interrupt) - close(b.requests) + close(b.tasks) atomic.StoreInt32(&b.running, 0) - case r, ok := <-b.requests: + case t, ok := <-b.tasks: if ok { - b.log.Infof("executing request: %+v", r) - b.submit(r) + b.log.Infof("executing request: %+v", t.request) + b.process(t.request, t.handler) } } } } -func (b *defaultBuilder) submit(request Request) { +func (b *defaultBuilder) process(request Request, handler func(Result)) { result, present := b.request.Load(request.Meta.Name) if !present || result == nil { b.log.Panicf("no info found for: %+v", request.Meta.Name) @@ -127,6 +134,10 @@ func (b *defaultBuilder) submit(request Request) { r.Status = StatusStarted r.Task.StartedAt = time.Now() + if handler != nil { + handler(r) + } + // create tmp path buildDir := request.BuildDir if buildDir == "" { @@ -140,6 +151,7 @@ func (b *defaultBuilder) submit(request Request) { } defer os.RemoveAll(builderPath) + defer b.request.Delete(request.Meta.Name) c := Context{ C: b.ctx, @@ -164,8 +176,6 @@ func (b *defaultBuilder) submit(request Request) { // update the cache b.request.Store(request.Meta.Name, r) - - return } c.BaseImage = c.Image @@ -173,6 +183,14 @@ func (b *defaultBuilder) submit(request Request) { // update the cache b.request.Store(request.Meta.Name, r) + if r.Status == StatusError { + if handler != nil { + handler(r) + } + + return + } + // Sort steps by phase sort.SliceStable(request.Steps, func(i, j int) bool { return request.Steps[i].Phase() < request.Steps[j].Phase() @@ -224,7 +242,7 @@ func (b *defaultBuilder) submit(request Request) { // update the cache b.request.Store(request.Meta.Name, r) - b.log.Infof("request to build context %s executed in %f seconds", request.Meta.Name, r.Task.Elapsed().Seconds()) + b.log.Infof("build request %s executed in %f seconds", request.Meta.Name, r.Task.Elapsed().Seconds()) b.log.Infof("dependencies: %s", request.Dependencies) b.log.Infof("artifacts: %s", ArtifactIDs(c.Artifacts)) b.log.Infof("artifacts selected: %s", ArtifactIDs(c.SelectedArtifacts)) @@ -232,4 +250,8 @@ func (b *defaultBuilder) submit(request Request) { b.log.Infof("base image: %s", c.BaseImage) b.log.Infof("resolved image: %s", c.Image) b.log.Infof("resolved public image: %s", c.PublicImage) + + if handler != nil { + handler(r) + } } diff --git a/pkg/builder/builder_steps.go b/pkg/builder/builder_steps.go index 0da76e4..876647f 100644 --- a/pkg/builder/builder_steps.go +++ b/pkg/builder/builder_steps.go @@ -28,8 +28,6 @@ import ( "github.com/scylladb/go-set/strset" - "github.com/rs/xid" - "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/util/tar" @@ -39,8 +37,6 @@ import ( "github.com/apache/camel-k/pkg/util/maven" "github.com/apache/camel-k/version" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // GenerateProject -- @@ -338,75 +334,3 @@ func FindBestImage(images []PublishedImage, dependencies []string, artifacts []v return bestImage, bestImageCommonLibs } - -// NotifyIntegrationContext -- -func NotifyIntegrationContext(ctx *Context) error { - target := v1alpha1.IntegrationContext{ - TypeMeta: metav1.TypeMeta{ - Kind: v1alpha1.IntegrationContextKind, - APIVersion: v1alpha1.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: ctx.Namespace, - Name: ctx.Request.Meta.Name, - }, - } - key := k8sclient.ObjectKey{ - Namespace: ctx.Namespace, - Name: ctx.Request.Meta.Name, - } - - if err := ctx.Client.Get(ctx.C, key, &target); err != nil { - return err - } - - t := target.DeepCopy() - if t.Annotations == nil { - t.Annotations = make(map[string]string) - } - - // Add a random ID to trigger update - t.Annotations["camel.apache.org/build.id"] = xid.New().String() - - if err := ctx.Client.Update(ctx.C, t); err != nil { - return err - } - - return nil -} - -// NotifyIntegration -- -func NotifyIntegration(ctx *Context) error { - target := v1alpha1.Integration{ - TypeMeta: metav1.TypeMeta{ - Kind: v1alpha1.IntegrationKind, - APIVersion: v1alpha1.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: ctx.Namespace, - Name: ctx.Request.Meta.Name, - }, - } - key := k8sclient.ObjectKey{ - Namespace: ctx.Namespace, - Name: ctx.Request.Meta.Name, - } - - if err := ctx.Client.Get(ctx.C, key, &target); err != nil { - return err - } - - t := target.DeepCopy() - if t.Annotations == nil { - t.Annotations = make(map[string]string) - } - - // Add a random ID to trigger update - t.Annotations["camel.apache.org/build.id"] = xid.New().String() - - if err := ctx.Client.Update(ctx.C, t); err != nil { - return err - } - - return nil -} diff --git a/pkg/builder/builder_types.go b/pkg/builder/builder_types.go index 9247222..4f7f4a1 100644 --- a/pkg/builder/builder_types.go +++ b/pkg/builder/builder_types.go @@ -46,8 +46,8 @@ const ( // Builder -- type Builder interface { - Submit(request Request) Result - Purge(request Request) + IsBuilding(object v1.ObjectMeta) bool + Submit(request Request, handler func(Result)) } // Step -- @@ -124,6 +124,7 @@ func (t Task) Elapsed() time.Duration { // Result represents the result of a build type Result struct { + Builder Builder Request Request BaseImage string Image string diff --git a/pkg/builder/kaniko/kaniko.go b/pkg/builder/kaniko/kaniko.go index 4b2d826..33d8e4f 100644 --- a/pkg/builder/kaniko/kaniko.go +++ b/pkg/builder/kaniko/kaniko.go @@ -27,7 +27,6 @@ var DefaultSteps = []builder.Step{ builder.NewStep("build/compute-dependencies", builder.ProjectBuildPhase, builder.ComputeDependencies), builder.NewStep("packager", builder.ApplicationPackagePhase, builder.StandardPackager), builder.NewStep("publisher/kaniko", builder.ApplicationPublishPhase, Publisher), - builder.NewStep("notify/context", builder.NotifyPhase, builder.NotifyIntegrationContext), } // BuildDir is the directory where to build artifacts (shared with the Kaniko pod) diff --git a/pkg/builder/s2i/s2i.go b/pkg/builder/s2i/s2i.go index 8824e5d..24a19f2 100644 --- a/pkg/builder/s2i/s2i.go +++ b/pkg/builder/s2i/s2i.go @@ -27,5 +27,4 @@ var DefaultSteps = []builder.Step{ builder.NewStep("build/compute-dependencies", builder.ProjectBuildPhase, builder.ComputeDependencies), builder.NewStep("packager/incremental", builder.ApplicationPackagePhase, builder.IncrementalPackager), builder.NewStep("publisher/s2i", builder.ApplicationPublishPhase, Publisher), - builder.NewStep("notify/context", builder.NotifyPhase, builder.NotifyIntegrationContext), } diff --git a/pkg/controller/integration/build_image.go b/pkg/controller/integration/build_image.go index e1ec3f5..8cddf49 100644 --- a/pkg/controller/integration/build_image.go +++ b/pkg/controller/integration/build_image.go @@ -24,6 +24,8 @@ import ( "github.com/pkg/errors" + "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/util/digest" "github.com/apache/camel-k/pkg/trait" @@ -33,7 +35,6 @@ import ( "github.com/sirupsen/logrus" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) // NewBuildImageAction create an action that handles integration image build @@ -67,54 +68,74 @@ func (action *buildImageAction) Handle(ctx context.Context, integration *v1alpha // look-up the integration context associated to this integration, this is needed // to determine the base image - ictx := v1alpha1.NewIntegrationContext(integration.Namespace, integration.Status.Context) - ikey := k8sclient.ObjectKey{ - Namespace: integration.Namespace, - Name: integration.Status.Context, - } - if err := action.client.Get(ctx, ikey, &ictx); err != nil { - return errors.Wrapf(err, "unable to find integration context %s, %s", ikey.Name, err) + ictx, err := kubernetes.GetIntegrationContext(action.Context, action.client, integration.Status.Context, integration.Namespace) + if err != nil || ictx == nil { + return errors.Wrapf(err, "unable to find integration context %s, %s", integration.Status.Context, err) } b, err := platform.GetPlatformBuilder(action.Context, action.client, action.namespace) if err != nil { return err } - env, err := trait.Apply(ctx, action.client, integration, &ictx) - if err != nil { - return err - } - // This build do not require to determine dependencies nor a project, the builder - // step do remove them - r := builder.Request{ - Meta: integration.ObjectMeta, - Steps: env.Steps, - BuildDir: env.BuildDir, - Platform: env.Platform.Spec, - Image: ictx.Status.Image, - } + if !b.IsBuilding(ictx.ObjectMeta) { + env, err := trait.Apply(ctx, action.client, integration, ictx) + if err != nil { + return err + } - // Sources are added as part of the standard deployment bits - r.Resources = make([]builder.Resource, 0, len(integration.Spec.Sources)) + // This build do not require to determine dependencies nor a project, the builder + // step do remove them + r := builder.Request{ + Meta: integration.ObjectMeta, + Steps: env.Steps, + BuildDir: env.BuildDir, + Platform: env.Platform.Spec, + Image: ictx.Status.Image, + // Sources are added as part of the standard deployment bits + Resources: make([]builder.Resource, 0, len(integration.Spec.Sources)), + } - for _, source := range integration.Spec.Sources { - r.Resources = append(r.Resources, builder.Resource{ - Content: []byte(source.Content), - Target: path.Join("sources", source.Name), - }) - } - for _, resource := range integration.Spec.Resources { - if resource.Type != v1alpha1.ResourceTypeData { - continue + // TODO: handle generated sources + // TODO: handle compressed sources + for _, source := range integration.Spec.Sources { + r.Resources = append(r.Resources, builder.Resource{ + Content: []byte(source.Content), + Target: path.Join("sources", source.Name), + }) + } + // TODO: handle compressed resources + for _, resource := range integration.Spec.Resources { + if resource.Type != v1alpha1.ResourceTypeData { + continue + } + r.Resources = append(r.Resources, builder.Resource{ + Content: []byte(resource.Content), + Target: path.Join("resources", resource.Name), + }) } - r.Resources = append(r.Resources, builder.Resource{ - Content: []byte(resource.Content), - Target: path.Join("resources", resource.Name), + + b.Submit(r, func(result builder.Result) { + // + // this function is invoked synchronously for every state change + // + if err := action.handleBuildStateChange(result); err != nil { + logrus.Warnf("Error while building integration image %s, reason: %s", ictx.Name, err.Error()) + } }) } - res := b.Submit(r) + return nil +} + +func (action *buildImageAction) handleBuildStateChange(res builder.Result) error { + // + // Get the latest status of the integration + // + target, err := kubernetes.GetIntegration(action.Context, action.client, res.Request.Meta.Name, res.Request.Meta.Namespace) + if err != nil || target == nil { + return err + } switch res.Status { case builder.StatusSubmitted: @@ -122,17 +143,14 @@ func (action *buildImageAction) Handle(ctx context.Context, integration *v1alpha case builder.StatusStarted: logrus.Info("Build started") case builder.StatusError: - target := integration.DeepCopy() + target := target.DeepCopy() target.Status.Phase = v1alpha1.IntegrationPhaseError logrus.Infof("Integration %s transitioning to state %s, reason: %s", target.Name, target.Status.Phase, res.Error.Error()) - // remove the build from cache - defer b.Purge(r) - - return action.client.Update(ctx, target) + return action.client.Update(action.Context, target) case builder.StatusCompleted: - target := integration.DeepCopy() + target := target.DeepCopy() target.Status.Phase = v1alpha1.IntegrationPhaseDeploying if res.PublicImage != "" { target.Status.Image = res.PublicImage @@ -140,7 +158,7 @@ func (action *buildImageAction) Handle(ctx context.Context, integration *v1alpha target.Status.Image = res.Image } - dgst, err := digest.ComputeForIntegration(integration) + dgst, err := digest.ComputeForIntegration(target) if err != nil { return err } @@ -149,10 +167,7 @@ func (action *buildImageAction) Handle(ctx context.Context, integration *v1alpha logrus.Info("Integration ", target.Name, " transitioning to state ", target.Status.Phase) - // remove the build from cache - defer b.Purge(r) - - if err := action.client.Update(ctx, target); err != nil { + if err := action.client.Update(action.Context, target); err != nil { return err } } diff --git a/pkg/controller/integrationcontext/build.go b/pkg/controller/integrationcontext/build.go index f24c35f..a910bbb 100644 --- a/pkg/controller/integrationcontext/build.go +++ b/pkg/controller/integrationcontext/build.go @@ -20,6 +20,8 @@ package integrationcontext import ( "context" + "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/trait" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -27,6 +29,7 @@ import ( "github.com/apache/camel-k/pkg/platform" "github.com/sirupsen/logrus" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -51,51 +54,73 @@ func (action *buildAction) CanHandle(ictx *v1alpha1.IntegrationContext) bool { } func (action *buildAction) Handle(ctx context.Context, ictx *v1alpha1.IntegrationContext) error { - p, err := platform.GetCurrentPlatform(ctx, action.client, ictx.Namespace) - if err != nil { - return err - } b, err := platform.GetPlatformBuilder(action.Context, action.client, ictx.Namespace) if err != nil { return err } - env, err := trait.Apply(ctx, action.client, nil, ictx) - if err != nil { - return err + + if !b.IsBuilding(ictx.ObjectMeta) { + p, err := platform.GetCurrentPlatform(ctx, action.client, ictx.Namespace) + if err != nil { + return err + } + env, err := trait.Apply(ctx, action.client, nil, ictx) + if err != nil { + return err + } + + // assume there's no duplication nor conflict for now + repositories := make([]string, 0, len(ictx.Spec.Repositories)+len(p.Spec.Build.Repositories)) + repositories = append(repositories, ictx.Spec.Repositories...) + repositories = append(repositories, p.Spec.Build.Repositories...) + + r := builder.Request{ + Meta: ictx.ObjectMeta, + Dependencies: ictx.Spec.Dependencies, + Repositories: repositories, + Steps: env.Steps, + BuildDir: env.BuildDir, + Platform: env.Platform.Spec, + } + + b.Submit(r, func(result builder.Result) { + // + // this function is invoked synchronously for every state change to avoid + // leaving one context not fully updated when the incremental builder search + // for a compatible/base image + // + if err := action.handleBuildStateChange(result); err != nil { + logrus.Warnf("Error while building context %s, reason: %s", ictx.Name, err.Error()) + } + }) } - // assume there's no duplication nor conflict for now - repositories := make([]string, 0, len(ictx.Spec.Repositories)+len(p.Spec.Build.Repositories)) - repositories = append(repositories, ictx.Spec.Repositories...) - repositories = append(repositories, p.Spec.Build.Repositories...) - - r := builder.Request{ - Meta: ictx.ObjectMeta, - Dependencies: ictx.Spec.Dependencies, - Repositories: repositories, - Steps: env.Steps, - BuildDir: env.BuildDir, - Platform: env.Platform.Spec, + return nil +} + +func (action *buildAction) handleBuildStateChange(res builder.Result) error { + // + // Get the latest status of the context + // + target, err := kubernetes.GetIntegrationContext(action.Context, action.client, res.Request.Meta.Name, res.Request.Meta.Namespace) + if err != nil || target == nil { + return err } - res := b.Submit(r) switch res.Status { case builder.StatusSubmitted: - logrus.Info("Build submitted") + logrus.Infof("Build submitted for IntegrationContext %s", target.Name) case builder.StatusStarted: - logrus.Info("Build started") + logrus.Infof("Build started for IntegrationContext %s", target.Name) case builder.StatusError: - target := ictx.DeepCopy() + target = target.DeepCopy() target.Status.Phase = v1alpha1.IntegrationContextPhaseError logrus.Infof("Context %s transitioning to state %s, reason: %s", target.Name, target.Status.Phase, res.Error.Error()) - // remove the build from cache - defer b.Purge(r) - - return action.client.Update(ctx, target) + return action.client.Update(action.Context, target) case builder.StatusCompleted: - target := ictx.DeepCopy() + target = target.DeepCopy() target.Status.BaseImage = res.BaseImage target.Status.Image = res.Image target.Status.PublicImage = res.PublicImage @@ -112,13 +137,11 @@ func (action *buildAction) Handle(ctx context.Context, ictx *v1alpha1.Integratio } logrus.Info("Context ", target.Name, " transitioning to state ", target.Status.Phase) - - // remove the build from cache - defer b.Purge(r) - - if err := action.client.Update(ctx, target); err != nil { + if err := action.client.Update(action.Context, target); err != nil { return err } + + logrus.Infof("Inform integrations about context %s state change", target.Name) if err := action.informIntegrations(target); err != nil { return err } diff --git a/pkg/trait/builder.go b/pkg/trait/builder.go index 0cb22fc..89443c3 100644 --- a/pkg/trait/builder.go +++ b/pkg/trait/builder.go @@ -78,7 +78,6 @@ func (t *builderTrait) Apply(e *Environment) error { e.Steps = []builder.Step{ builder.NewStep("packager", builder.ApplicationPackagePhase, builder.StandardPackager), builder.NewStep("publisher/s2i", builder.ApplicationPublishPhase, s2i.Publisher), - builder.NewStep("notify/integration", builder.NotifyPhase, builder.NotifyIntegration), } if e.DetermineProfile() == v1alpha1.TraitProfileKnative { e.Steps = append(e.Steps, builder.NewStep("publisher/replaceHost", builder.ApplicationPublishPhase+1, t.ReplaceHost)) @@ -87,7 +86,6 @@ func (t *builderTrait) Apply(e *Environment) error { e.Steps = []builder.Step{ builder.NewStep("packager", builder.ApplicationPackagePhase, builder.StandardPackager), builder.NewStep("publisher/kaniko", builder.ApplicationPublishPhase, kaniko.Publisher), - builder.NewStep("notify/integration", builder.NotifyPhase, builder.NotifyIntegration), } e.BuildDir = kaniko.BuildDir } diff --git a/pkg/trait/builder_test.go b/pkg/trait/builder_test.go index 1afefab..95b6b5c 100644 --- a/pkg/trait/builder_test.go +++ b/pkg/trait/builder_test.go @@ -82,7 +82,7 @@ func TestS2IBuilderTrait(t *testing.T) { assert.NotEmpty(t, env.ExecutedTraits) assert.NotNil(t, env.GetTrait(ID("builder"))) assert.NotEmpty(t, env.Steps) - assert.Len(t, env.Steps, 5) + assert.Len(t, env.Steps, 4) assert.Condition(t, func() bool { for _, s := range env.Steps { if s.ID() == "publisher/s2i" && s.Phase() == builder.ApplicationPublishPhase { @@ -102,7 +102,7 @@ func TestKanikoBuilderTrait(t *testing.T) { assert.NotEmpty(t, env.ExecutedTraits) assert.NotNil(t, env.GetTrait(ID("builder"))) assert.NotEmpty(t, env.Steps) - assert.Len(t, env.Steps, 5) + assert.Len(t, env.Steps, 4) assert.Condition(t, func() bool { for _, s := range env.Steps { if s.ID() == "publisher/kaniko" && s.Phase() == builder.ApplicationPublishPhase { diff --git a/pkg/util/kubernetes/util.go b/pkg/util/kubernetes/util.go index acf8b22..19c89cc 100644 --- a/pkg/util/kubernetes/util.go +++ b/pkg/util/kubernetes/util.go @@ -21,6 +21,8 @@ import ( "context" "fmt" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/pkg/client" "gopkg.in/yaml.v2" @@ -68,7 +70,7 @@ func GetConfigMap(context context.Context, client client.Client, name string, na Namespace: namespace, } - cm := corev1.ConfigMap{ + answer := corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ Kind: "ConfigMap", APIVersion: "v1", @@ -79,9 +81,41 @@ func GetConfigMap(context context.Context, client client.Client, name string, na }, } - if err := client.Get(context, key, &cm); err != nil { + if err := client.Get(context, key, &answer); err != nil { + return nil, err + } + + return &answer, nil +} + +// GetIntegrationContext -- +func GetIntegrationContext(context context.Context, client client.Client, name string, namespace string) (*v1alpha1.IntegrationContext, error) { + key := k8sclient.ObjectKey{ + Name: name, + Namespace: namespace, + } + + answer := v1alpha1.NewIntegrationContext(namespace, name) + + if err := client.Get(context, key, &answer); err != nil { + return nil, err + } + + return &answer, nil +} + +// GetIntegration -- +func GetIntegration(context context.Context, client client.Client, name string, namespace string) (*v1alpha1.Integration, error) { + key := k8sclient.ObjectKey{ + Name: name, + Namespace: namespace, + } + + answer := v1alpha1.NewIntegration(namespace, name) + + if err := client.Get(context, key, &answer); err != nil { return nil, err } - return &cm, nil + return &answer, nil } diff --git a/test/build_manager_integration_test.go b/test/build_manager_integration_test.go index 8dc7413..44ed850 100644 --- a/test/build_manager_integration_test.go +++ b/test/build_manager_integration_test.go @@ -22,6 +22,7 @@ limitations under the License. package test import ( + "fmt" "testing" "time" @@ -52,21 +53,29 @@ func TestBuildManagerBuild(t *testing.T) { "mvn:org.apache.camel/camel-core", "camel:telegram", }, - // to not include notify step - Steps: s2i.DefaultSteps[:len(s2i.DefaultSteps)-1], + Steps: s2i.DefaultSteps, } - b.Submit(r) + c := make(chan builder.Result) + + b.Submit(r, func(res builder.Result) { + c <- res + }) - deadline := time.Now().Add(5 * time.Minute) var result builder.Result - for time.Now().Before(deadline) { - result = b.Submit(r) - if result.Status == builder.StatusCompleted || result.Status == builder.StatusError { - break +loop: + for { + select { + case res := <-c: + if res.Status == builder.StatusCompleted || res.Status == builder.StatusError { + result = res + break loop + } + case <-time.After(5 * time.Minute): + fmt.Println("timeout 1") + break loop } - time.Sleep(2 * time.Second) } assert.NotEqual(t, builder.StatusError, result.Status) @@ -86,26 +95,34 @@ func TestBuildManagerFailedBuild(t *testing.T) { Platform: v1alpha1.IntegrationPlatformSpec{ Build: v1alpha1.IntegrationPlatformBuildSpec{ CamelVersion: "2.23.1", - BaseImage: "fabric8/s2i-java:3.0-java8", + BaseImage: "docker.io/fabric8/s2i-java:3.0-java8", }, }, Dependencies: []string{ "mvn:org.apache.camel/camel-cippalippa", }, - // to not include notify step - Steps: s2i.DefaultSteps[:len(s2i.DefaultSteps)-1], + Steps: s2i.DefaultSteps, } - b.Submit(r) + c := make(chan builder.Result) + + b.Submit(r, func(res builder.Result) { + c <- res + }) - deadline := time.Now().Add(5 * time.Minute) var result builder.Result - for time.Now().Before(deadline) { - result = b.Submit(r) - if result.Status == builder.StatusCompleted || result.Status == builder.StatusError { - break +loop: + for { + select { + case res := <-c: + if res.Status == builder.StatusCompleted || res.Status == builder.StatusError { + result = res + break loop + } + case <-time.After(5 * time.Minute): + fmt.Println("timeout 1") + break loop } - time.Sleep(2 * time.Second) } assert.Equal(t, builder.StatusError, result.Status)