This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 45e7838010584a62de3a16e6e8424d650a544987 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Nov 4 07:40:03 2020 +0100 Slack Source: Use the Slack built-in transforms --- slack/slack-source/README.adoc | 290 ++--------------------------------------- 1 file changed, 10 insertions(+), 280 deletions(-) diff --git a/slack/slack-source/README.adoc b/slack/slack-source/README.adoc index fee853f..f9e80d4 100644 --- a/slack/slack-source/README.adoc +++ b/slack/slack-source/README.adoc @@ -39,151 +39,12 @@ Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location -You'll need to build your connector starting from an archetype: - -``` -> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.6.0 -[INFO] Scanning for projects... -[INFO] -[INFO] ------------------< org.apache.maven:standalone-pom >------------------- -[INFO] Building Maven Stub Project (No POM) 1 -[INFO] --------------------------------[ pom ]--------------------------------- -[INFO] -[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> -[INFO] -[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< -[INFO] -[INFO] -[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- -[INFO] Generating project in Interactive mode -[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote -Define value for property 'groupId': org.apache.camel.kafkaconnector -Define value for property 'artifactId': slack-extended -Define value for property 'version' 1.0-SNAPSHOT: : 0.6.0 -Define value for property 'package' org.apache.camel.kafkaconnector: : -Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector -[INFO] Using property: camel-kafka-connector-version = 0.6.0 -Confirm properties configuration: -groupId: org.apache.camel.kafkaconnector -artifactId: slack-extended -version: 0.6.0 -package: org.apache.camel.kafkaconnector -camel-kafka-connector-name: camel-slack-kafka-connector -camel-kafka-connector-version: 0.6.0 - Y: : y -[INFO] ---------------------------------------------------------------------------- -[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.6.0 -[INFO] ---------------------------------------------------------------------------- -[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: artifactId, Value: slack-extended -[INFO] Parameter: version, Value: 0.6.0 -[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector -[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: version, Value: 0.6.0 -[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector -[INFO] Parameter: camel-kafka-connector-version, Value: 0.6.0 -[INFO] Parameter: artifactId, Value: slack-extended -[INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended -[INFO] ------------------------------------------------------------------------ -[INFO] BUILD SUCCESS -[INFO] ------------------------------------------------------------------------ -[INFO] Total time: 39.295 s -[INFO] Finished at: 2020-10-13T09:16:51+02:00 -[INFO] ------------------------------------------------------------------------ -> cd /home/workspace/miscellanea/slack-extended -``` - -and add the following class in the main package - -``` -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.slack.source; - -import java.util.Map; - -import org.apache.camel.component.slack.helper.SlackMessage; -import org.apache.camel.kafkaconnector.utils.SchemaHelper; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.transforms.Transformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> { - public static final String FIELD_KEY_CONFIG = "key"; - public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Transforms String-based content from Kafka into a map"); - - private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class); - - @Override - public R apply(R r) { - Object value = r.value(); - - if (r.value() instanceof SlackMessage) { - LOG.debug("Converting record from SlackMessage to text"); - SlackMessage message = (SlackMessage) r.value(); - - LOG.debug("Received text: {}", message.getText()); - - return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), - SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp()); - - } else { - LOG.debug("Unexpected message type: {}", r.value().getClass()); - - return r; - } - } - - @Override - public ConfigDef config() { - return CONFIG_DEF; - } - - @Override - public void close() { - - } - - @Override - public void configure(Map<String, ?> map) { - - } -} -``` - -Now we need to build the connector: - -``` -> mvn clean package -``` - -In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previois build +In this example we'll use `/home/oscerd/connectors/` ``` > cd /home/oscerd/connectors/ -> cp /home/workspace/miscellanea/slack-extended/target/slack-extended-0.6.0-package.zip . -> unzip slack-extended-0.6.0-package.zip +> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-slack-kafka-connector/0.6.0/camel-slack-kafka-connector-0.6.0-package.zip +> unzip camel-slack-kafka-connector-0.6.0-package.zip ``` Now it's time to setup the connectors @@ -195,7 +56,7 @@ name=CamelSlackSourceConnector connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter transforms=SlackTransformer -transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.SlackTransformer +transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.slack.transformers.SlackTransforms topics=mytopic @@ -284,144 +145,13 @@ if you have built the whole project (`mvn clean package`) decompress the connect so that each one is in its own subfolder (alternatively you can download the latest officially released and packaged connectors from maven): -In this case we need to extend an existing connector and add a Transform, so we need to leverage the archetype - -``` -> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.6.0 -[INFO] Scanning for projects... -[INFO] -[INFO] ------------------< org.apache.maven:standalone-pom >------------------- -[INFO] Building Maven Stub Project (No POM) 1 -[INFO] --------------------------------[ pom ]--------------------------------- -[INFO] -[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> -[INFO] -[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< -[INFO] -[INFO] -[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- -[INFO] Generating project in Interactive mode -[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote -Define value for property 'groupId': org.apache.camel.kafkaconnector -Define value for property 'artifactId': slack-extended -Define value for property 'version' 1.0-SNAPSHOT: : 0.6.0 -Define value for property 'package' org.apache.camel.kafkaconnector: : -Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector -[INFO] Using property: camel-kafka-connector-version = 0.6.0 -Confirm properties configuration: -groupId: org.apache.camel.kafkaconnector -artifactId: slack-extended -version: 0.6.0 -package: org.apache.camel.kafkaconnector -camel-kafka-connector-name: camel-slack-kafka-connector -camel-kafka-connector-version: 0.6.0 - Y: : y -[INFO] ---------------------------------------------------------------------------- -[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.6.0 -[INFO] ---------------------------------------------------------------------------- -[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: artifactId, Value: slack-extended -[INFO] Parameter: version, Value: 0.6.0 -[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector -[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: version, Value: 0.6.0 -[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector -[INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector -[INFO] Parameter: camel-kafka-connector-version, Value: 0.6.0 -[INFO] Parameter: artifactId, Value: slack-extended -[INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended -[INFO] ------------------------------------------------------------------------ -[INFO] BUILD SUCCESS -[INFO] ------------------------------------------------------------------------ -[INFO] Total time: 39.295 s -[INFO] Finished at: 2020-10-13T09:16:51+02:00 -[INFO] ------------------------------------------------------------------------ -> cd /home/workspace/miscellanea/slack-extended -``` -and add the following class in the main package - -``` -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.slack.source; - -import java.util.Map; - -import org.apache.camel.component.slack.helper.SlackMessage; -import org.apache.camel.kafkaconnector.utils.SchemaHelper; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.transforms.Transformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> { - public static final String FIELD_KEY_CONFIG = "key"; - public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Transforms String-based content from Kafka into a map"); - - private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class); - - @Override - public R apply(R r) { - Object value = r.value(); - - if (r.value() instanceof SlackMessage) { - LOG.debug("Converting record from SlackMessage to text"); - SlackMessage message = (SlackMessage) r.value(); - - LOG.debug("Received text: {}", message.getText()); - - return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), - SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp()); - - } else { - LOG.debug("Unexpected message type: {}", r.value().getClass()); - - return r; - } - } - - @Override - public ConfigDef config() { - return CONFIG_DEF; - } - - @Override - public void close() { - - } - - @Override - public void configure(Map<String, ?> map) { - - } -} -``` - -Now we need to build the connector: +So we need to do something like this: ``` -> mvn clean package +> cd my-connectors/ +> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-slack-kafka-connector/0.6.0/camel-slack-kafka-connector-0.6.0-package.zip +> unzip camel-slack-kafka-connector-0.6.0-package.zip ``` -And move the zip package in targe to my-connectors folder and unzipped it. Now we can start the build @@ -493,7 +223,7 @@ oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "transforms": "SlackTransformer", - "transforms.SlackTransformer.type": "org.apache.camel.kafkaconnector.SlackTransformer", + "transforms.SlackTransformer.type": "org.apache.camel.kafkaconnector.slack.transformers.SlackTransforms", "topics": "slack-topic", "camel.source.path.channel": "general", "camel.source.endpoint.token": "<token>" @@ -520,7 +250,7 @@ spec: config: key.converter: org.apache.kafka.connect.storage.StringConverter transforms: SlackTransformer - transforms.SlackTransformer.type: org.apache.camel.kafkaconnector.SlackTransformer + transforms.SlackTransformer.type: org.apache.camel.kafkaconnector.slack.transformers.SlackTransforms topics: slack-topic camel.source.path.channel: general camel.source.endpoint.token: token