This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
commit 86bed97dd8c169fa05e54649cccc8d01d710a5d1 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Oct 19 12:48:10 2023 +0200 Create a Specialized Kamelet starting from Kafka Source using Azure Schema Registry - Added Kamelet Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- ...kafka-azure-schema-registry-source.kamelet.yaml | 171 +++++++++++++++++++++ ...kafka-azure-schema-registry-source.kamelet.yaml | 171 +++++++++++++++++++++ 2 files changed, 342 insertions(+) diff --git a/kamelets/kafka-azure-schema-registry-source.kamelet.yaml b/kamelets/kafka-azure-schema-registry-source.kamelet.yaml new file mode 100644 index 00000000..e95856c5 --- /dev/null +++ b/kamelets/kafka-azure-schema-registry-source.kamelet.yaml @@ -0,0 +1,171 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-azure-schema-registry-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "4.1.0-SNAPSHOT" + camel.apache.org/kamelet.icon: " [...] + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + camel.apache.org/keda.type: "kafka" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Azure Kafka through Eventhubs with Azure Schema Registry Source" + description: |- + Receive data from Kafka topics on Azure Eventhubs on combined with Azure Schema Registry. + required: + - topic + - bootstrapServers + - apicurioRegistryUrl + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + securityProtocol: + title: Security Protocol + description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported + type: string + default: SASL_SSL + saslMechanism: + title: SASL Mechanism + description: The Simple Authentication and Security Layer (SASL) Mechanism used. + type: string + default: PLAIN + password: + title: Password + description: Password to authenticate to kafka + type: string + format: password + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:password + - urn:camel:group:credentials + - urn:keda:authentication:password + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: true + valueDeserializer: + title: Value Deserializer + description: Deserializer class for value that implements the Deserializer interface. + type: string + default: "com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer" + azureRegistryUrl: + title: Apicurio Registry URL + description: The Apicurio Schema Registry URL + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + specificAvroValueType: + title: Specific Avro Value Type + description: The Specific Type Avro will have to deal with + type: string + example: "com.example.Order" + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.1.0-SNAPSHOT" + - "camel:kafka" + - "camel:core" + - "camel:kamelet" + - "mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1" + - "mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4" + - "mvn:com.azure:azure-identity:1.9.0" + template: + beans: + - name: defaultAzureCredential + type: "#class:org.apache.camel.kamelets.utils.kafka.registry.DefaultAzureCredentialWrapper" + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + properties: + enabled: '{{deserializeHeaders}}' + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + securityProtocol: "{{securityProtocol}}" + saslMechanism: "{{saslMechanism}}" + saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password={{connectionstring}};' + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + valueDeserializer: "{{valueDeserializer}}" + additionalProperties.schema.registry.url: "{{azureRegistryUrl}}" + additionalProperties.schema.group: avro + additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential' + additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):{{specificAvroValueType}}' + additionalProperties.specific.avro.reader: '#valueAs(boolean):true' + steps: + - process: + ref: "{{kafkaHeaderDeserializer}}" + - to: "kamelet:sink" diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-azure-schema-registry-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-azure-schema-registry-source.kamelet.yaml new file mode 100644 index 00000000..e95856c5 --- /dev/null +++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-azure-schema-registry-source.kamelet.yaml @@ -0,0 +1,171 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-azure-schema-registry-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "4.1.0-SNAPSHOT" + camel.apache.org/kamelet.icon: " [...] + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + camel.apache.org/keda.type: "kafka" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Azure Kafka through Eventhubs with Azure Schema Registry Source" + description: |- + Receive data from Kafka topics on Azure Eventhubs on combined with Azure Schema Registry. + required: + - topic + - bootstrapServers + - apicurioRegistryUrl + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + securityProtocol: + title: Security Protocol + description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported + type: string + default: SASL_SSL + saslMechanism: + title: SASL Mechanism + description: The Simple Authentication and Security Layer (SASL) Mechanism used. + type: string + default: PLAIN + password: + title: Password + description: Password to authenticate to kafka + type: string + format: password + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:password + - urn:camel:group:credentials + - urn:keda:authentication:password + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: true + valueDeserializer: + title: Value Deserializer + description: Deserializer class for value that implements the Deserializer interface. + type: string + default: "com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer" + azureRegistryUrl: + title: Apicurio Registry URL + description: The Apicurio Schema Registry URL + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + specificAvroValueType: + title: Specific Avro Value Type + description: The Specific Type Avro will have to deal with + type: string + example: "com.example.Order" + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.1.0-SNAPSHOT" + - "camel:kafka" + - "camel:core" + - "camel:kamelet" + - "mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1" + - "mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4" + - "mvn:com.azure:azure-identity:1.9.0" + template: + beans: + - name: defaultAzureCredential + type: "#class:org.apache.camel.kamelets.utils.kafka.registry.DefaultAzureCredentialWrapper" + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + properties: + enabled: '{{deserializeHeaders}}' + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + securityProtocol: "{{securityProtocol}}" + saslMechanism: "{{saslMechanism}}" + saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password={{connectionstring}};' + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + valueDeserializer: "{{valueDeserializer}}" + additionalProperties.schema.registry.url: "{{azureRegistryUrl}}" + additionalProperties.schema.group: avro + additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential' + additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):{{specificAvroValueType}}' + additionalProperties.specific.avro.reader: '#valueAs(boolean):true' + steps: + - process: + ref: "{{kafkaHeaderDeserializer}}" + - to: "kamelet:sink"