This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit d07ea17312f0134e3bf12bbd9a34c9a360888cd3 Author: Nicola Ferraro <ni.ferr...@gmail.com> AuthorDate: Fri Jul 24 17:38:07 2020 +0200 Fix #1548: initial work on sinkbinding --- ...yaml => camel-catalog-1.5.0-SNAPSHOT-main.yaml} | 6 +- ...l => camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml} | 6 +- ...el-k.v1.1.0-snapshot.clusterserviceversion.yaml | 1 + deploy/operator-role-knative.yaml | 1 + deploy/resources.go | 24 +-- examples/knative/messages-channel.yaml | 2 +- examples/knative/words-channel.yaml | 2 +- go.sum | 2 + helm/camel-k/templates/operator-role.yaml | 1 + pkg/apis/addtoscheme_knative_eventing.go | 2 + pkg/trait/knative.go | 178 +++++++++++++++------ pkg/util/defaults/defaults.go | 2 +- pkg/util/knative/knative.go | 39 ++++- pkg/util/kubernetes/collection.go | 34 ++++ script/Makefile | 4 +- 15 files changed, 235 insertions(+), 69 deletions(-) diff --git a/deploy/camel-catalog-1.4.1-main.yaml b/deploy/camel-catalog-1.5.0-SNAPSHOT-main.yaml similarity index 99% rename from deploy/camel-catalog-1.4.1-main.yaml rename to deploy/camel-catalog-1.5.0-SNAPSHOT-main.yaml index c7f9361..1ce39cb 100644 --- a/deploy/camel-catalog-1.4.1-main.yaml +++ b/deploy/camel-catalog-1.5.0-SNAPSHOT-main.yaml @@ -18,16 +18,16 @@ apiVersion: camel.apache.org/v1 kind: CamelCatalog metadata: - name: camel-catalog-1.4.1-main + name: camel-catalog-1.5.0-snapshot-main labels: app: camel-k camel.apache.org/catalog.version: 3.4.0 camel.apache.org/catalog.loader.version: 3.4.0 - camel.apache.org/runtime.version: 1.4.1 + camel.apache.org/runtime.version: 1.5.0-SNAPSHOT camel.apache.org/runtime.provider: main spec: runtime: - version: 1.4.1 + version: 1.5.0-SNAPSHOT provider: main applicationClass: org.apache.camel.k.main.Application metadata: diff --git a/deploy/camel-catalog-1.4.1-quarkus.yaml b/deploy/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml similarity index 99% rename from deploy/camel-catalog-1.4.1-quarkus.yaml rename to deploy/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml index e8cde54..113288e 100644 --- a/deploy/camel-catalog-1.4.1-quarkus.yaml +++ b/deploy/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml @@ -18,16 +18,16 @@ apiVersion: camel.apache.org/v1 kind: CamelCatalog metadata: - name: camel-catalog-1.4.1-quarkus + name: camel-catalog-1.5.0-snapshot-quarkus labels: app: camel-k camel.apache.org/catalog.version: 3.4.0 camel.apache.org/catalog.loader.version: 3.4.0 - camel.apache.org/runtime.version: 1.4.1 + camel.apache.org/runtime.version: 1.5.0-SNAPSHOT camel.apache.org/runtime.provider: quarkus spec: runtime: - version: 1.4.1 + version: 1.5.0-SNAPSHOT provider: quarkus applicationClass: io.quarkus.runner.GeneratedMain metadata: diff --git a/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml b/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml index 4cc81c8..d761fee 100644 --- a/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml +++ b/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml @@ -411,6 +411,7 @@ spec: - apiGroups: - eventing.knative.dev - messaging.knative.dev + - sources.knative.dev resources: - '*' verbs: diff --git a/deploy/operator-role-knative.yaml b/deploy/operator-role-knative.yaml index 47ed6c6..36c6a44 100644 --- a/deploy/operator-role-knative.yaml +++ b/deploy/operator-role-knative.yaml @@ -38,6 +38,7 @@ rules: - apiGroups: - eventing.knative.dev - messaging.knative.dev + - sources.knative.dev resources: - "*" verbs: diff --git a/deploy/resources.go b/deploy/resources.go index c43e536..e1e547b 100644 --- a/deploy/resources.go +++ b/deploy/resources.go @@ -88,19 +88,19 @@ var assets = func() http.FileSystem { compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x53\x3d\x6f\xdb\x30\x14\xdc\xf9\x2b\x0e\xd6\x92\x00\xfe\x68\x3b\xba\x93\x9a\xd8\xa8\xd0\xc0\x06\x22\xa7\x41\xc6\x67\xf1\x59\x7a\x08\x45\xaa\x24\x15\xc5\xff\xbe\xa0\x6c\x37\x09\xba\x86\x9b\xa0\xd3\x7d\xf0\x4e\x19\x66\x9f\x77\x54\x86\x3b\xa9\xd8\x06\xd6\x88\x0e\xb1\x61\xe4\x1d\x55\x0d\xa3\x74\x87\x38\x90\x67\xac\x5d\x6f\x35\x45\x71\x16\x57\x79\xb9\xbe\x46\x6f\x35\x7b\x38\xcb\x70\x1e\xad\xf3\xac\x32\x54\xce\x46\x2f\x [...] }, - "/camel-catalog-1.4.1-main.yaml": &vfsgen۰CompressedFileInfo{ - name: "camel-catalog-1.4.1-main.yaml", + "/camel-catalog-1.5.0-SNAPSHOT-main.yaml": &vfsgen۰CompressedFileInfo{ + name: "camel-catalog-1.5.0-SNAPSHOT-main.yaml", modTime: time.Time{}, - uncompressedSize: 89151, + uncompressedSize: 89178, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\xbd\x5b\x77\xdb\x38\xb2\x2f\xfe\x9e\x4f\xc1\x35\x79\xd9\x7b\xfd\x47\x35\xdd\xce\xec\xe9\xff\xee\xf3\x64\xcb\x71\x62\xc7\x76\xdc\x91\x27\xc9\xf4\x4b\x2f\x88\x84\x24\x48\x24\x41\x03\xd0\xc5\xfe\xf4\x67\xe1\xc2\xab\x20\x48\x24\x0c\xaf\xe3\x07\x93\x22\xaa\x7e\xc5\x22\x48\x5c\x0a\x85\xaa\xf7\xd1\xe8\xf5\xfe\xde\xbd\x8f\x6e\x49\x8c\x73\x8e\x93\x48\xd0\x48\x2c\x70\x74\x5e\xa0\x78\x81\xa3\x09\x9d\x89\x2d\x62\x38\xba\xa2\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x7d\x5b\x77\xdb\x38\xb2\xee\x7b\x7e\x05\xd7\xe4\x65\xef\x75\x86\x35\x3d\xce\xcc\xf4\xd9\x7d\x9e\x6c\x39\x4e\xec\xd8\x8e\x3b\xf2\x24\x99\x7e\xe9\x05\x91\x90\x04\x8b\x24\x68\x00\x92\x65\xff\xfa\xb3\x70\xe1\x55\x10\x24\x12\x86\xd7\xf6\x83\x49\x11\x55\x5f\xb1\x08\x12\x97\x42\xa1\xea\x7d\x14\xbf\xde\xdf\xbb\xf7\xd1\x35\x49\x70\xc1\x71\x1a\x09\x1a\x89\x25\x8e\x4e\x4b\x94\x2c\x71\x34\xa5\x73\xf1\x84\x18\x8e\x2e\xe8\xba\x [...] }, - "/camel-catalog-1.4.1-quarkus.yaml": &vfsgen۰CompressedFileInfo{ - name: "camel-catalog-1.4.1-quarkus.yaml", + "/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml": &vfsgen۰CompressedFileInfo{ + name: "camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml", modTime: time.Time{}, - uncompressedSize: 49007, + uncompressedSize: 49034, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x5d\xdf\x77\xdb\xa8\xf2\x7f\xcf\x5f\xa1\xd3\xbc\xdc\x7b\xce\x8a\xdd\x4d\xf7\xee\x43\xbf\x4f\x49\xda\x74\x93\x36\x69\x1a\xe7\xb6\xdd\x7d\xe9\xc1\x12\xb6\x89\x25\x50\x00\xd9\x4e\xfe\xfa\xef\x01\x21\x4b\xb2\x95\xd1\x8f\x80\x6f\x1e\x22\x59\x0c\x9f\x81\x01\xc1\x68\x98\x81\xe3\x20\x74\xf7\x77\x74\x1c\x7c\xa6\x11\x61\x92\xc4\x81\xe2\x81\x5a\x90\xe0\x34\xc3\xd1\x82\x04\x13\x3e\x53\x6b\x2c\x48\x70\xc1\x73\x16\x63\x45\x39\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x7d\x5f\x77\xdb\x2c\xf2\xff\x7d\x5e\x85\x4e\x73\xb3\x7b\xce\x23\x9e\x6e\xba\xfb\x5c\xf4\x77\x95\xa4\x4d\x9b\xb4\x49\xd3\x38\xdb\x76\x9f\x9b\x1e\x2c\x61\x9b\x58\x02\x05\x90\xed\xe4\xd5\xff\x0e\x08\x59\x92\xad\x8c\xfe\x04\xfc\xcd\x45\x24\x8b\xe1\x33\x30\x20\x18\x0d\x33\x70\x1c\x84\xee\xfe\x8e\x8e\x83\xaf\x34\x22\x4c\x92\x38\x50\x3c\x50\x0b\x12\x9c\x66\x38\x5a\x90\x60\xc2\x67\x6a\x8d\x05\x09\x2e\x78\xce\x62\xac\x28\x [...] }, "/cr-example.yaml": &vfsgen۰CompressedFileInfo{ name: "cr-example.yaml", @@ -189,9 +189,9 @@ var assets = func() http.FileSystem { "/operator-role-knative.yaml": &vfsgen۰CompressedFileInfo{ name: "operator-role-knative.yaml", modTime: time.Time{}, - uncompressedSize: 1423, + uncompressedSize: 1447, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x53\xc1\x8e\xdb\x36\x10\xbd\xf3\x2b\x1e\xac\x4b\x52\xac\xe5\xa6\xa7\xc2\x3d\xb9\xc9\x6e\x2b\x34\xb0\x81\x95\xd3\x20\xc7\x31\x35\x96\x06\xa6\x48\x75\x48\x59\xd9\x7e\x7d\x21\xda\x6e\x76\xb1\x87\x5e\x82\xf2\xe2\x31\xf5\xf4\xe6\xbd\x79\xa3\x02\xcb\xef\x77\x4c\x81\x8f\x62\xd9\x47\x6e\x90\x02\x52\xc7\xd8\x0c\x64\x3b\x46\x1d\x8e\x69\x22\x65\x3c\x84\xd1\x37\x94\x24\x78\xbc\xd9\xd4\x0f\x6f\x31\xfa\x86\x15\xc1\x33\x82\xa2\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x53\xc1\x8e\xdb\x36\x10\xbd\xf3\x2b\x1e\xac\x4b\x52\xac\xe5\xa6\xa7\xc2\x3d\xb9\x9b\xdd\x56\x68\x60\x03\x2b\xa7\x41\x8e\x63\x6a\x2c\x0d\x4c\x91\xea\x90\xb2\xb2\xfd\xfa\x42\xb2\xdc\x78\xb1\x45\x4f\x41\x78\xd1\x88\x7c\x7a\xf3\x1e\xdf\x28\xc3\xf2\xdb\x2d\x93\xe1\x83\x58\xf6\x91\x2b\xa4\x80\xd4\x30\x36\x1d\xd9\x86\x51\x86\x63\x1a\x48\x19\x8f\xa1\xf7\x15\x25\x09\x1e\x6f\x36\xe5\xe3\x5b\xf4\xbe\x62\x45\xf0\x8c\xa0\x68\x [...] }, "/operator-role-kubernetes.yaml": &vfsgen۰CompressedFileInfo{ name: "operator-role-kubernetes.yaml", @@ -372,8 +372,8 @@ var assets = func() http.FileSystem { fs["/builder-role-kubernetes.yaml"].(os.FileInfo), fs["/builder-role-openshift.yaml"].(os.FileInfo), fs["/builder-service-account.yaml"].(os.FileInfo), - fs["/camel-catalog-1.4.1-main.yaml"].(os.FileInfo), - fs["/camel-catalog-1.4.1-quarkus.yaml"].(os.FileInfo), + fs["/camel-catalog-1.5.0-SNAPSHOT-main.yaml"].(os.FileInfo), + fs["/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml"].(os.FileInfo), fs["/cr-example.yaml"].(os.FileInfo), fs["/crd-build.yaml"].(os.FileInfo), fs["/crd-camel-catalog.yaml"].(os.FileInfo), diff --git a/examples/knative/messages-channel.yaml b/examples/knative/messages-channel.yaml index fe44738..abd3483 100644 --- a/examples/knative/messages-channel.yaml +++ b/examples/knative/messages-channel.yaml @@ -15,7 +15,7 @@ # limitations under the License. # --------------------------------------------------------------------------- -apiVersion: messaging.knative.dev/v1alpha1 +apiVersion: messaging.knative.dev/v1beta1 kind: InMemoryChannel metadata: name: messages diff --git a/examples/knative/words-channel.yaml b/examples/knative/words-channel.yaml index 94e0ec1..33e5004 100644 --- a/examples/knative/words-channel.yaml +++ b/examples/knative/words-channel.yaml @@ -15,7 +15,7 @@ # limitations under the License. # --------------------------------------------------------------------------- -apiVersion: messaging.knative.dev/v1alpha1 +apiVersion: messaging.knative.dev/v1beta1 kind: InMemoryChannel metadata: name: words diff --git a/go.sum b/go.sum index e28b52d..c5bf1b1 100644 --- a/go.sum +++ b/go.sum @@ -1106,7 +1106,9 @@ github.com/radovskyb/watcher v1.0.6/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= +github.com/robfig/cron v0.0.0-20170526150127-736158dc09e1 h1:NZInwlJPD/G44mJDgBEMFvBfbv/QQKCrpo+az/QXn8c= github.com/robfig/cron v0.0.0-20170526150127-736158dc09e1/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/helm/camel-k/templates/operator-role.yaml b/helm/camel-k/templates/operator-role.yaml index 46948b2..d4f7ef4 100644 --- a/helm/camel-k/templates/operator-role.yaml +++ b/helm/camel-k/templates/operator-role.yaml @@ -203,6 +203,7 @@ rules: - apiGroups: - eventing.knative.dev - messaging.knative.dev + - sources.knative.dev resources: - "*" verbs: diff --git a/pkg/apis/addtoscheme_knative_eventing.go b/pkg/apis/addtoscheme_knative_eventing.go index ef0486b..de1ef1e 100644 --- a/pkg/apis/addtoscheme_knative_eventing.go +++ b/pkg/apis/addtoscheme_knative_eventing.go @@ -22,6 +22,7 @@ import ( eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" ) func init() { @@ -30,4 +31,5 @@ func init() { AddToSchemes = append(AddToSchemes, eventingv1beta1.AddToScheme) AddToSchemes = append(AddToSchemes, messagingv1alpha1.AddToScheme) AddToSchemes = append(AddToSchemes, messagingv1beta1.AddToScheme) + AddToSchemes = append(AddToSchemes, sourcesv1alpha1.AddToScheme) } diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index daf4d73..2aee1e2 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -18,23 +18,23 @@ limitations under the License. package trait import ( + "fmt" "net/url" + "reflect" "strings" - "github.com/pkg/errors" - - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - - eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - serving "knative.dev/serving/pkg/apis/serving/v1" - v1 "github.com/apache/camel-k/pkg/apis/camel/v1" knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative" "github.com/apache/camel-k/pkg/metadata" "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/envvar" knativeutil "github.com/apache/camel-k/pkg/util/knative" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + serving "knative.dev/serving/pkg/apis/serving/v1" ) // The Knative trait automatically discovers addresses of Knative resources and inject them into the @@ -73,6 +73,10 @@ type knativeTrait struct { FilterSourceChannels *bool `property:"filter-source-channels" json:"filterSourceChannels,omitempty"` // Enables Knative CamelSource pre 0.15 compatibility fixes (will be removed in future versions). CamelSourceCompat *bool `property:"camel-source-compat" json:"camelSourceCompat,omitempty"` + // Allows binding the integration to a sink via a Knative SinkBinding resource. + // This can be used when the integration targets a single sink. + // It's disabled by default. + SinkBinding *bool `property:"sink-binding" json:"sinkBinding,omitempty"` // Enable automatic discovery of all trait properties. Auto *bool `property:"auto" json:"auto,omitempty"` } @@ -201,6 +205,10 @@ func (t *knativeTrait) Apply(e *Environment) error { } } + if t.SinkBinding != nil && *t.SinkBinding { + util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-knative") + } + if len(t.ChannelSources) > 0 || len(t.EndpointSources) > 0 || len(t.EventSources) > 0 { util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP) } @@ -225,6 +233,9 @@ func (t *knativeTrait) Apply(e *Environment) error { if err := t.configureEvents(e, &env); err != nil { return err } + if err := t.configureSinkBinding(e, &env); err != nil { + return err + } conf, err := env.Serialize() if err != nil { @@ -268,19 +279,21 @@ func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEn return err } - // Sinks - err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink, - func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error { - svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, - knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind) - if err != nil { - return err - } - env.Services = append(env.Services, svc) - return nil - }) - if err != nil { - return err + if t.SinkBinding == nil || !*t.SinkBinding { + // Sinks + err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink, + func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error { + svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, + knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind) + if err != nil { + return err + } + env.Services = append(env.Services, svc) + return nil + }) + if err != nil { + return err + } } return nil @@ -319,18 +332,20 @@ func (t *knativeTrait) configureEndpoints(e *Environment, env *knativeapi.CamelE } // Sinks - err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink, - func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error { - svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, - knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind) - if err != nil { - return err - } - env.Services = append(env.Services, svc) - return nil - }) - if err != nil { - return err + if t.SinkBinding == nil || !*t.SinkBinding { + err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink, + func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error { + svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, + knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind) + if err != nil { + return err + } + env.Services = append(env.Services, svc) + return nil + }) + if err != nil { + return err + } } return nil @@ -366,23 +381,96 @@ func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvi } // Sinks - err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink, - func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error { - svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, - knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind) - if err != nil { - return err - } - env.Services = append(env.Services, svc) - return nil - }) - if err != nil { - return err + if t.SinkBinding == nil || !*t.SinkBinding { + err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink, + func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error { + svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, + knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind) + if err != nil { + return err + } + env.Services = append(env.Services, svc) + return nil + }) + if err != nil { + return err + } } return nil } +func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.CamelEnvironment) error { + if t.SinkBinding == nil || !*t.SinkBinding { + return nil + } + var serviceType knativeapi.CamelServiceType + services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel) + if len(services) > 0 { + serviceType = knativeapi.CamelServiceTypeChannel + } + services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...) + if len(serviceType) == 0 && len(services) > 0 { + serviceType = knativeapi.CamelServiceTypeEndpoint + } + services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...) + if len(serviceType) == 0 && len(services) > 0 { + serviceType = knativeapi.CamelServiceTypeEvent + } + + if len(services) != 1 { + return fmt.Errorf("sinkbinding can only be used with a single sink: found %d sinks", len(services)) + } + + err := t.withServiceDo(false, e, env, services, serviceType, knativeapi.CamelEndpointKindSink, func(ref *corev1.ObjectReference, url *url.URL, serviceURI string) error { + util.StringSliceUniqueAdd(&e.Interceptors, "knative-sink-binding") + e.ApplicationProperties["loader.interceptor.knative-sink-binding.name"] = ref.Name + e.ApplicationProperties["loader.interceptor.knative-sink-binding.type"] = string(serviceType) + e.ApplicationProperties["loader.interceptor.knative-sink-binding.kind"] = ref.Kind + e.ApplicationProperties["loader.interceptor.knative-sink-binding.api-version"] = ref.APIVersion + + if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) { + e.PostStepProcessors = append(e.PostStepProcessors, func(e *Environment) error { + sinkBindingInjected := false + e.Resources.Visit(func(object runtime.Object) { + gvk := object.GetObjectKind().GroupVersionKind() + if gvk.Kind == "SinkBinding" && strings.Contains(gvk.Group, "knative") { + sinkBindingInjected = true + } + }) + if sinkBindingInjected { + return nil + } + + controller := e.Resources.GetController(func(object runtime.Object) bool { + return true + }) + if controller != nil && !reflect.ValueOf(controller).IsNil() { + gvk := controller.GetObjectKind().GroupVersionKind() + av, k := gvk.ToAPIVersionAndKind() + source := corev1.ObjectReference{ + Kind: k, + Namespace: e.Integration.Namespace, + Name: e.Integration.Name, + APIVersion: av, + } + target := corev1.ObjectReference{ + Kind: ref.Kind, + Namespace: e.Integration.Namespace, + Name: ref.Name, + APIVersion: ref.APIVersion, + } + e.Resources.Add(knativeutil.CreateSinkBinding(source, target)) + } + return nil + }) + } + return nil + }) + + return err +} + func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string) { // TODO extend to additional filters too, to filter them at source and not at destination found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool { diff --git a/pkg/util/defaults/defaults.go b/pkg/util/defaults/defaults.go index a2b2a84..0b1e481 100644 --- a/pkg/util/defaults/defaults.go +++ b/pkg/util/defaults/defaults.go @@ -26,7 +26,7 @@ const ( Version = "1.1.0-SNAPSHOT" // DefaultRuntimeVersion -- - DefaultRuntimeVersion = "1.4.1" + DefaultRuntimeVersion = "1.5.0-SNAPSHOT" // BuildahVersion -- BuildahVersion = "1.14.0" diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go index e39eb54..5df1247 100644 --- a/pkg/util/knative/knative.go +++ b/pkg/util/knative/knative.go @@ -31,9 +31,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - messaging "knative.dev/eventing/pkg/apis/messaging/v1alpha1" + messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1" + sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" + duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" + "knative.dev/pkg/tracker" serving "knative.dev/serving/pkg/apis/serving/v1" controller "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -98,6 +101,40 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e return &subs } +// CreateSinkBinding --- +func CreateSinkBinding(source corev1.ObjectReference, target corev1.ObjectReference) runtime.Object { + binding := sourcesv1alpha1.SinkBinding{ + TypeMeta: metav1.TypeMeta{ + APIVersion: sourcesv1alpha1.SchemeGroupVersion.String(), + Kind: "SinkBinding", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: source.Namespace, + Name: source.Name, + }, + Spec: sourcesv1alpha1.SinkBindingSpec{ + BindingSpec: duckv1alpha1.BindingSpec{ + Subject: tracker.Reference{ + APIVersion: source.APIVersion, + Kind: source.Kind, + Name: source.Name, + }, + }, + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: target.APIVersion, + Kind: target.Kind, + Name: target.Name, + }, + }, + }, + }, + } + + return &binding +} + // GetAddressableReference looks up the resource among all given types and returns an object reference to it func GetAddressableReference(ctx context.Context, c client.Client, possibleReferences []corev1.ObjectReference, namespace string, name string) (*corev1.ObjectReference, error) { diff --git a/pkg/util/kubernetes/collection.go b/pkg/util/kubernetes/collection.go index bbfb5d2..e04ddbf 100644 --- a/pkg/util/kubernetes/collection.go +++ b/pkg/util/kubernetes/collection.go @@ -254,6 +254,17 @@ func (c *Collection) GetRoute(filter func(*routev1.Route) bool) *routev1.Route { return retValue } +// GetCronJob returns a CronJob that matches the given function +func (c *Collection) GetCronJob(filter func(job *v1beta1.CronJob) bool) *v1beta1.CronJob { + var retValue *v1beta1.CronJob + c.VisitCronJob(func(re *v1beta1.CronJob) { + if filter(re) { + retValue = re + } + }) + return retValue +} + // VisitCronJob executes the visitor function on all CronJob resources func (c *Collection) VisitCronJob(visitor func(*v1beta1.CronJob)) { c.Visit(func(res runtime.Object) { @@ -357,6 +368,29 @@ func (c *Collection) VisitContainer(visitor func(container *corev1.Container)) { }) } +// GetController returns the controller associated with the integration (e.g. Deployment, Knative Service or CronJob) +func (c *Collection) GetController(filter func(object runtime.Object) bool) runtime.Object { + d := c.GetDeployment(func(deployment *appsv1.Deployment) bool { + return filter(deployment) + }) + if d != nil { + return d + } + svc := c.GetKnativeService(func(service *serving.Service) bool { + return filter(service) + }) + if svc != nil { + return svc + } + cj := c.GetCronJob(func(job *v1beta1.CronJob) bool { + return filter(job) + }) + if cj != nil { + return cj + } + return nil +} + // VisitPodSpec executes the visitor function on all PodSpec inside deployments or other resources func (c *Collection) VisitPodSpec(visitor func(container *corev1.PodSpec)) { c.VisitDeployment(func(d *appsv1.Deployment) { diff --git a/script/Makefile b/script/Makefile index fabc398..56f433c 100644 --- a/script/Makefile +++ b/script/Makefile @@ -16,7 +16,7 @@ VERSIONFILE := pkg/util/defaults/defaults.go VERSION := 1.1.0-SNAPSHOT LAST_RELEASED_VERSION := 1.0.1 -RUNTIME_VERSION := 1.4.1 +RUNTIME_VERSION := 1.5.0-SNAPSHOT BUILDAH_VERSION := 1.14.0 KANIKO_VERSION := 0.17.1 BASE_IMAGE := adoptopenjdk/openjdk11:slim @@ -30,7 +30,7 @@ LINT_DEADLINE := 10m # Used to push pre-release artifacts STAGING_IMAGE_NAME := docker.io/camelk/camel-k -STAGING_RUNTIME_REPO := https://repository.apache.org/content/repositories/orgapachecamel-1235 +STAGING_RUNTIME_REPO := # When packaging artifacts into the docker image, you can "copy" them from local maven # or "download" them from Apache Snapshots and Maven Central