This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e7a96387 Add preliminary support for the resume API
5e7a96387 is described below

commit 5e7a963876c24d788903719bd6b4b568247f9fd1
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Tue May 17 11:29:01 2022 +0200

    Add preliminary support for the resume API
---
 addons/register_resume.go         |  27 ++++++++
 addons/resume/resume.go           | 126 ++++++++++++++++++++++++++++++++++++++
 pkg/apis/camel/v1/common_types.go |   2 +
 pkg/util/defaults/defaults.go     |   2 +-
 4 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/addons/register_resume.go b/addons/register_resume.go
new file mode 100644
index 000000000..5b5f60947
--- /dev/null
+++ b/addons/register_resume.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package addons
+
+import (
+       "github.com/apache/camel-k/addons/resume"
+       "github.com/apache/camel-k/pkg/trait"
+)
+
+func init() {
+       trait.AddToTraits(resume.NewResumeTrait)
+}
diff --git a/addons/resume/resume.go b/addons/resume/resume.go
new file mode 100644
index 000000000..de590aa65
--- /dev/null
+++ b/addons/resume/resume.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package resume
+
+import (
+       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       traitv1 "github.com/apache/camel-k/pkg/apis/camel/v1/trait"
+       "github.com/apache/camel-k/pkg/metadata"
+       "github.com/apache/camel-k/pkg/trait"
+       "github.com/apache/camel-k/pkg/util"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
+       "github.com/apache/camel-k/pkg/util/log"
+       "k8s.io/utils/pointer"
+)
+
+// The Resume trait can be used to manage and configure resume strategies.
+//
+// This feature is meant to allow quick resume of processing by Camel K 
instances after they have been restarted. This
+// is an experimental implementation based on the support available on Camel 
Core:
+// https://camel.apache.org/components/next/eips/resume-strategies.html.
+//
+// The Resume trait is disabled by default.
+//
+// The main different from the implementation on Core is that it's not 
necessary to bind the strategies to the
+// registry. This step will be done automatically by Camel K, after resolving 
the options passed to the trait.
+//
+// A sample execution of this trait, using the Kafka backend (the only one 
supported at the moment), would require
+// the following trait options:
+// -t resume.enabled=true -t resume.resume-path=camel-file-sets -t 
resume.resume-server="address-of-your-kafka:9092"
+//
+// +camel-k:trait=resume.
+type Trait struct {
+       traitv1.Trait `property:",squash"`
+       // Enables automatic configuration of the trait.
+       Auto *bool `property:"auto" json:"auto,omitempty"`
+       // The type of the resume strategy to use
+       ResumeStrategy string `property:"resume-strategy,omitempty"`
+       // The path used by the resume strategy (this is specific to the resume 
strategy type)
+       ResumePath string `property:"resume-path,omitempty"`
+       // The address of the resume server to use (protocol / implementation 
specific)
+       ResumeServer string `property:"resume-server,omitempty"`
+       // The adapter-specific policy to use when filling the cache (use: 
minimizing / maximizing). Check
+       // the component documentation if unsure
+       CacheFillPolicy string `property:"cache-fill-policy,omitempty"`
+}
+
+type resumeTrait struct {
+       trait.BaseTrait
+       Trait `property:",squash"`
+}
+
+const (
+       KafkaSingle  = 
"org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy"
+       StrategyPath = "camel-k-offsets"
+)
+
+func NewResumeTrait() trait.Trait {
+       return &resumeTrait{
+               BaseTrait: trait.NewBaseTrait("resume", 
trait.TraitOrderBeforeControllerCreation),
+       }
+}
+
+func (r *resumeTrait) Configure(environment *trait.Environment) (bool, error) {
+       if !pointer.BoolDeref(r.Enabled, false) {
+               return false, nil
+       }
+
+       if !environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) 
&& !environment.IntegrationInRunningPhases() {
+               return false, nil
+       }
+
+       if pointer.BoolDeref(r.Auto, true) {
+               // Check which components have been used
+               sources, err := 
kubernetes.ResolveIntegrationSources(environment.Ctx, r.Client, 
environment.Integration, environment.Resources)
+               if err != nil {
+                       return false, err
+               }
+
+               meta := metadata.ExtractAll(environment.CamelCatalog, sources)
+
+               for _, endpoint := range meta.FromURIs {
+                       log.Infof("Processing component %s", endpoint)
+               }
+
+               if r.ResumeStrategy == "" {
+                       r.ResumeStrategy = KafkaSingle
+               }
+
+               if r.ResumePath == "" {
+                       r.ResumePath = StrategyPath
+               }
+       }
+
+       return r.Enabled != nil && *r.Enabled, nil
+}
+
+func (r *resumeTrait) Apply(environment *trait.Environment) error {
+       if environment.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+               
util.StringSliceUniqueAdd(&environment.Integration.Status.Capabilities, 
v1.CapabilityResumeKafka)
+       }
+
+       if environment.IntegrationInRunningPhases() {
+               environment.ApplicationProperties["customizer.resume.enabled"] 
= "true"
+               
environment.ApplicationProperties["customizer.resume.resumeStrategy"] = 
r.ResumeStrategy
+               
environment.ApplicationProperties["customizer.resume.resumePath"] = r.ResumePath
+               
environment.ApplicationProperties["customizer.resume.resumeServer"] = 
r.ResumeServer
+               
environment.ApplicationProperties["customizer.resume.cacheFillPolicy"] = 
r.CacheFillPolicy
+       }
+
+       return nil
+}
diff --git a/pkg/apis/camel/v1/common_types.go 
b/pkg/apis/camel/v1/common_types.go
index 660ab9a60..4a585256c 100644
--- a/pkg/apis/camel/v1/common_types.go
+++ b/pkg/apis/camel/v1/common_types.go
@@ -304,6 +304,8 @@ const (
        CapabilityTracing = "tracing"
        // CapabilityMaster defines the master capability
        CapabilityMaster = "master"
+       // CapabilityResumeKafka defines the resume capability
+       CapabilityResumeKafka = "resume-kafka"
 )
 
 // +kubebuilder:object:generate=false
diff --git a/pkg/util/defaults/defaults.go b/pkg/util/defaults/defaults.go
index 06b6a20b2..0775adc62 100644
--- a/pkg/util/defaults/defaults.go
+++ b/pkg/util/defaults/defaults.go
@@ -47,5 +47,5 @@ const (
        installDefaultKamelets = true
 )
 
-// GitCommit must be provided during application build
+//GitCommit must be provided during application build
 var GitCommit string

Reply via email to