This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push: new a71debb Use non-caching client in builds scheduling critical section a71debb is described below commit a71debbcf59fbdbfb285a26b896dcce467171818 Author: Antonin Stefanutti <anto...@stefanutti.fr> AuthorDate: Fri Apr 19 13:43:41 2019 +0200 Use non-caching client in builds scheduling critical section --- pkg/controller/build/build_controller.go | 29 ++++++++++++++++++++++++----- pkg/controller/build/schedule_pod.go | 13 +++++++++---- pkg/controller/build/schedule_routine.go | 8 ++++++-- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/pkg/controller/build/build_controller.go b/pkg/controller/build/build_controller.go index c6deca9..b9d23a2 100644 --- a/pkg/controller/build/build_controller.go +++ b/pkg/controller/build/build_controller.go @@ -10,6 +10,7 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -30,16 +31,33 @@ func Add(mgr manager.Manager) error { if err != nil { return err } - return add(mgr, newReconciler(mgr, c)) + reconciler, err := newReconciler(mgr, c) + if err != nil { + return err + } + return add(mgr, reconciler) } // newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { +func newReconciler(mgr manager.Manager, c client.Client) (reconcile.Reconciler, error) { + // Non-caching client to be used whenever caching may cause race conditions, + // like in the builds scheduling critical section. + // TODO: to be replaced with Manager.GetAPIReader() as soon as it's available, see: + // https://github.com/kubernetes-sigs/controller-runtime/pull/327 + clientOptions := k8sclient.Options{ + Scheme: mgr.GetScheme(), + } + reader, err := k8sclient.New(mgr.GetConfig(), clientOptions) + if err != nil { + return nil, err + } + return &ReconcileBuild{ client: c, + reader: reader, scheme: mgr.GetScheme(), builder: builder.New(c), - } + }, nil } // add adds a new Controller to mgr with r as the reconcile.Reconciler @@ -96,6 +114,7 @@ type ReconcileBuild struct { // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the apiserver client client.Client + reader k8sclient.Reader scheme *runtime.Scheme builder builder.Builder routines sync.Map @@ -128,8 +147,8 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result, buildActionPool := []Action{ NewInitializeAction(), - NewScheduleRoutineAction(r.builder, &r.routines), - NewSchedulePodAction(), + NewScheduleRoutineAction(r.reader, r.builder, &r.routines), + NewSchedulePodAction(r.reader), NewMonitorRoutineAction(&r.routines), NewMonitorPodAction(), NewErrorRecoveryAction(), diff --git a/pkg/controller/build/schedule_pod.go b/pkg/controller/build/schedule_pod.go index fef4f84..8bbb7c9 100644 --- a/pkg/controller/build/schedule_pod.go +++ b/pkg/controller/build/schedule_pod.go @@ -35,13 +35,16 @@ import ( ) // NewSchedulePodAction creates a new schedule action -func NewSchedulePodAction() Action { - return &schedulePodAction{} +func NewSchedulePodAction(reader k8sclient.Reader) Action { + return &schedulePodAction{ + reader: reader, + } } type schedulePodAction struct { baseAction - lock sync.Mutex + lock sync.Mutex + reader k8sclient.Reader } // Name returns a common name of the action @@ -63,7 +66,9 @@ func (action *schedulePodAction) Handle(ctx context.Context, build *v1alpha1.Bui builds := &v1alpha1.BuildList{} options := &k8sclient.ListOptions{Namespace: build.Namespace} - err := action.client.List(ctx, options, builds) + // We use the non-caching client as informers cache is not invalidated nor updated + // atomically by write operations + err := action.reader.List(ctx, options, builds) if err != nil { return err } diff --git a/pkg/controller/build/schedule_routine.go b/pkg/controller/build/schedule_routine.go index 9cac35b..e171601 100644 --- a/pkg/controller/build/schedule_routine.go +++ b/pkg/controller/build/schedule_routine.go @@ -28,8 +28,9 @@ import ( ) // NewScheduleRoutineAction creates a new schedule routine action -func NewScheduleRoutineAction(b builder.Builder, r *sync.Map) Action { +func NewScheduleRoutineAction(reader k8sclient.Reader, b builder.Builder, r *sync.Map) Action { return &scheduleRoutineAction{ + reader: reader, builder: b, routines: r, } @@ -38,6 +39,7 @@ func NewScheduleRoutineAction(b builder.Builder, r *sync.Map) Action { type scheduleRoutineAction struct { baseAction lock sync.Mutex + reader k8sclient.Reader builder builder.Builder routines *sync.Map } @@ -61,7 +63,9 @@ func (action *scheduleRoutineAction) Handle(ctx context.Context, build *v1alpha1 builds := &v1alpha1.BuildList{} options := &k8sclient.ListOptions{Namespace: build.Namespace} - err := action.client.List(ctx, options, builds) + // We use the non-caching client as informers cache is not invalidated nor updated + // atomically by write operations + err := action.reader.List(ctx, options, builds) if err != nil { return err }