This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push: new 5e7a96387 Add preliminary support for the resume API 5e7a96387 is described below commit 5e7a963876c24d788903719bd6b4b568247f9fd1 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue May 17 11:29:01 2022 +0200 Add preliminary support for the resume API --- addons/register_resume.go | 27 ++++++++ addons/resume/resume.go | 126 ++++++++++++++++++++++++++++++++++++++ pkg/apis/camel/v1/common_types.go | 2 + pkg/util/defaults/defaults.go | 2 +- 4 files changed, 156 insertions(+), 1 deletion(-) diff --git a/addons/register_resume.go b/addons/register_resume.go new file mode 100644 index 000000000..5b5f60947 --- /dev/null +++ b/addons/register_resume.go @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package addons + +import ( + "github.com/apache/camel-k/addons/resume" + "github.com/apache/camel-k/pkg/trait" +) + +func init() { + trait.AddToTraits(resume.NewResumeTrait) +} diff --git a/addons/resume/resume.go b/addons/resume/resume.go new file mode 100644 index 000000000..de590aa65 --- /dev/null +++ b/addons/resume/resume.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package resume + +import ( + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + traitv1 "github.com/apache/camel-k/pkg/apis/camel/v1/trait" + "github.com/apache/camel-k/pkg/metadata" + "github.com/apache/camel-k/pkg/trait" + "github.com/apache/camel-k/pkg/util" + "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/util/log" + "k8s.io/utils/pointer" +) + +// The Resume trait can be used to manage and configure resume strategies. +// +// This feature is meant to allow quick resume of processing by Camel K instances after they have been restarted. This +// is an experimental implementation based on the support available on Camel Core: +// https://camel.apache.org/components/next/eips/resume-strategies.html. +// +// The Resume trait is disabled by default. +// +// The main different from the implementation on Core is that it's not necessary to bind the strategies to the +// registry. This step will be done automatically by Camel K, after resolving the options passed to the trait. +// +// A sample execution of this trait, using the Kafka backend (the only one supported at the moment), would require +// the following trait options: +// -t resume.enabled=true -t resume.resume-path=camel-file-sets -t resume.resume-server="address-of-your-kafka:9092" +// +// +camel-k:trait=resume. +type Trait struct { + traitv1.Trait `property:",squash"` + // Enables automatic configuration of the trait. + Auto *bool `property:"auto" json:"auto,omitempty"` + // The type of the resume strategy to use + ResumeStrategy string `property:"resume-strategy,omitempty"` + // The path used by the resume strategy (this is specific to the resume strategy type) + ResumePath string `property:"resume-path,omitempty"` + // The address of the resume server to use (protocol / implementation specific) + ResumeServer string `property:"resume-server,omitempty"` + // The adapter-specific policy to use when filling the cache (use: minimizing / maximizing). Check + // the component documentation if unsure + CacheFillPolicy string `property:"cache-fill-policy,omitempty"` +} + +type resumeTrait struct { + trait.BaseTrait + Trait `property:",squash"` +} + +const ( + KafkaSingle = "org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy" + StrategyPath = "camel-k-offsets" +) + +func NewResumeTrait() trait.Trait { + return &resumeTrait{ + BaseTrait: trait.NewBaseTrait("resume", trait.TraitOrderBeforeControllerCreation), + } +} + +func (r *resumeTrait) Configure(environment *trait.Environment) (bool, error) { + if !pointer.BoolDeref(r.Enabled, false) { + return false, nil + } + + if !environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) && !environment.IntegrationInRunningPhases() { + return false, nil + } + + if pointer.BoolDeref(r.Auto, true) { + // Check which components have been used + sources, err := kubernetes.ResolveIntegrationSources(environment.Ctx, r.Client, environment.Integration, environment.Resources) + if err != nil { + return false, err + } + + meta := metadata.ExtractAll(environment.CamelCatalog, sources) + + for _, endpoint := range meta.FromURIs { + log.Infof("Processing component %s", endpoint) + } + + if r.ResumeStrategy == "" { + r.ResumeStrategy = KafkaSingle + } + + if r.ResumePath == "" { + r.ResumePath = StrategyPath + } + } + + return r.Enabled != nil && *r.Enabled, nil +} + +func (r *resumeTrait) Apply(environment *trait.Environment) error { + if environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) { + util.StringSliceUniqueAdd(&environment.Integration.Status.Capabilities, v1.CapabilityResumeKafka) + } + + if environment.IntegrationInRunningPhases() { + environment.ApplicationProperties["customizer.resume.enabled"] = "true" + environment.ApplicationProperties["customizer.resume.resumeStrategy"] = r.ResumeStrategy + environment.ApplicationProperties["customizer.resume.resumePath"] = r.ResumePath + environment.ApplicationProperties["customizer.resume.resumeServer"] = r.ResumeServer + environment.ApplicationProperties["customizer.resume.cacheFillPolicy"] = r.CacheFillPolicy + } + + return nil +} diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go index 660ab9a60..4a585256c 100644 --- a/pkg/apis/camel/v1/common_types.go +++ b/pkg/apis/camel/v1/common_types.go @@ -304,6 +304,8 @@ const ( CapabilityTracing = "tracing" // CapabilityMaster defines the master capability CapabilityMaster = "master" + // CapabilityResumeKafka defines the resume capability + CapabilityResumeKafka = "resume-kafka" ) // +kubebuilder:object:generate=false diff --git a/pkg/util/defaults/defaults.go b/pkg/util/defaults/defaults.go index 06b6a20b2..0775adc62 100644 --- a/pkg/util/defaults/defaults.go +++ b/pkg/util/defaults/defaults.go @@ -47,5 +47,5 @@ const ( installDefaultKamelets = true ) -// GitCommit must be provided during application build +//GitCommit must be provided during application build var GitCommit string