This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-source in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit ce4727bc6a7433b2712e951c53b46b8af322d592 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Feb 9 08:33:56 2021 +0100 Added a Java example of kafka source to AWS S3 --- examples/kafka-source-s3/README.md | 141 +++++++++++++++++++++ examples/kafka-source-s3/data/MyRoutes.java | 26 ++++ .../kafka-source-s3/data/application.properties | 44 +++++++ examples/kafka-source-s3/pom.xml | 105 +++++++++++++++ .../src/main/resources/application.properties | 16 +++ examples/pom.xml | 1 + 6 files changed, 333 insertions(+) diff --git a/examples/kafka-source-s3/README.md b/examples/kafka-source-s3/README.md new file mode 100644 index 0000000..0f75ac0 --- /dev/null +++ b/examples/kafka-source-s3/README.md @@ -0,0 +1,141 @@ +== Camel-K-runtime Java Example for kafka consumer to AWS S3 + +This example shows the usage of Camel-k-runtime for kafka consumer to AWS S3 + +The route involves kafka and aws2-s3 component + +=== Setup + +You'll need to have a kafka instance running on your machine or in docker. +You'll need AWS Credentials. + +=== How to run + +You have two ways of doing this. + +First approach: + + mvn exec:exec + +This approach will pack and run a camel-quarkus runner. + +Second approach + + mvn clean package + export CAMEL_K_CONF=${project.basedir}/data/application.properties + export CAMEL_K_ROUTES=file:${project.basedir}/data/MyRoutes.java + java -jar target/camel-k-runtime-example-java-runner.jar + +You should get the following output in both cases + +2021-02-09 08:27:13,463 INFO [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.7.0-SNAPSHOT +2021-02-09 08:27:13,482 INFO [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime +2021-02-09 08:27:13,488 INFO [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: file:/home/oscerd/workspace/apache-camel/camel-k-runtime/examples/kafka-source-s3/data/MyRoutes.java +2021-02-09 08:27:14,345 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.7.0 (camel-q) is starting +2021-02-09 08:27:14,345 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) StreamCaching is enabled on CamelContext: camel-q +2021-02-09 08:27:15,724 INFO [org.apa.cam.imp.eng.DefaultStreamCachingStrategy] (main) StreamCaching in use with spool directory: /tmp/camel-q and rules: [Spool > 128K body size] +2021-02-09 08:27:15,725 INFO [org.apa.cam.com.kaf.KafkaConsumer] (main) Starting Kafka consumer on topic: testtopic with breakOnFirstError: false +2021-02-09 08:27:15,747 INFO [org.apa.kaf.cli.con.ConsumerConfig] (main) ConsumerConfig values: + allow.auto.create.topics = true + auto.commit.interval.ms = 5000 + auto.offset.reset = latest + bootstrap.servers = [localhost:9092] + check.crcs = true + client.dns.lookup = default + client.id = + client.rack = + connections.max.idle.ms = 540000 + default.api.timeout.ms = 60000 + enable.auto.commit = true + exclude.internal.topics = true + fetch.max.bytes = 52428800 + fetch.max.wait.ms = 500 + fetch.min.bytes = 1 + group.id = 94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b + group.instance.id = null + heartbeat.interval.ms = 3000 + interceptor.classes = [] + internal.leave.group.on.close = true + isolation.level = read_uncommitted + key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer + max.partition.fetch.bytes = 1048576 + max.poll.interval.ms = 300000 + max.poll.records = 500 + metadata.max.age.ms = 300000 + metric.reporters = [] + metrics.num.samples = 2 + metrics.recording.level = INFO + metrics.sample.window.ms = 30000 + partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] + receive.buffer.bytes = 65536 + reconnect.backoff.max.ms = 1000 + reconnect.backoff.ms = 50 + request.timeout.ms = 40000 + retry.backoff.ms = 100 + sasl.client.callback.handler.class = null + sasl.jaas.config = null + sasl.kerberos.kinit.cmd = /usr/bin/kinit + sasl.kerberos.min.time.before.relogin = 60000 + sasl.kerberos.service.name = null + sasl.kerberos.ticket.renew.jitter = 0.05 + sasl.kerberos.ticket.renew.window.factor = 0.8 + sasl.login.callback.handler.class = null + sasl.login.class = null + sasl.login.refresh.buffer.seconds = 300 + sasl.login.refresh.min.period.seconds = 60 + sasl.login.refresh.window.factor = 0.8 + sasl.login.refresh.window.jitter = 0.05 + sasl.mechanism = GSSAPI + security.protocol = PLAINTEXT + security.providers = null + send.buffer.bytes = 131072 + session.timeout.ms = 10000 + ssl.cipher.suites = null + ssl.enabled.protocols = [TLSv1.2] + ssl.endpoint.identification.algorithm = https + ssl.key.password = null + ssl.keymanager.algorithm = SunX509 + ssl.keystore.location = null + ssl.keystore.password = null + ssl.keystore.type = JKS + ssl.protocol = TLSv1.2 + ssl.provider = null + ssl.secure.random.implementation = null + ssl.trustmanager.algorithm = PKIX + ssl.truststore.location = null + ssl.truststore.password = null + ssl.truststore.type = JKS + value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer + +2021-02-09 08:27:15,850 WARN [org.apa.kaf.cli.con.ConsumerConfig] (main) The configuration 'specific.avro.reader' was supplied but isn't a known config. +2021-02-09 08:27:15,851 INFO [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka version: 2.5.0 +2021-02-09 08:27:15,851 INFO [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka commitId: 66563e712b0b9f84 +2021-02-09 08:27:15,851 INFO [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka startTimeMs: 1612855635850 +2021-02-09 08:27:15,854 INFO [org.apa.cam.imp.eng.InternalRouteStartupManager] (main) Route: route1 started and consuming from: kafka://testtopic +2021-02-09 08:27:15,854 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) Subscribing testtopic-Thread 0 to topic testtopic +2021-02-09 08:27:15,855 INFO [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Subscribed to topic(s): testtopic +2021-02-09 08:27:15,857 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Total 1 routes, of which 1 are started +2021-02-09 08:27:15,857 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.7.0 (camel-q) started in 1s512ms +2021-02-09 08:27:15,860 INFO [io.quarkus] (main) camel-k-runtime-example-java 1.7.0-SNAPSHOT on JVM (powered by Quarkus 1.11.0.Final) started in 2.798s. +2021-02-09 08:27:15,860 INFO [io.quarkus] (main) Profile prod activated. +2021-02-09 08:27:15,861 INFO [io.quarkus] (main) Installed features: [camel-aws2-commons, camel-aws2-s3, camel-bean, camel-core, camel-endpointdsl, camel-k-core, camel-k-loader-java, camel-k-runtime, camel-kafka, camel-main, camel-support-common, camel-support-commons-logging, cdi] +2021-02-09 08:27:16,036 INFO [org.apa.kaf.cli.Metadata] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Cluster ID: ujmZ7YiORXCtQJ2h9USuEw +2021-02-09 08:27:16,037 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) +2021-02-09 08:27:16,045 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] (Re-)joining group +2021-02-09 08:27:16,052 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group +2021-02-09 08:27:16,052 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] (Re-)joining group +2021-02-09 08:27:16,056 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Finished assignment for group at generation 1: {consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1-bde70b4e-c3e5-4e67-b832-b258cbaad60d=Assignment(partitions=[testtopic-0])} +2021-02-09 08:27:16,062 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Successfully joined group with generation 1 +2021-02-09 08:27:16,065 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Adding newly assigned partitions: testtopic-0 +2021-02-09 08:27:16,072 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Found no committed offset for partition testtopic-0 +2021-02-09 08:27:16,083 INFO [org.apa.kaf.cli.con.int.SubscriptionState] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Resetting offset for partition testtopic-0 to offset 3. + +=== Help and contributions + +If you hit any problem using Camel or have some feedback, then please +https://camel.apache.org/support.html[let us know]. + +We also love contributors, so +https://camel.apache.org/contributing.html[get involved] :-) + +The Camel riders! diff --git a/examples/kafka-source-s3/data/MyRoutes.java b/examples/kafka-source-s3/data/MyRoutes.java new file mode 100644 index 0000000..104e16f --- /dev/null +++ b/examples/kafka-source-s3/data/MyRoutes.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.camel.builder.RouteBuilder; + +public class MyRoutes extends RouteBuilder { + @Override + public void configure() throws Exception { + from("kafka:{{kafkatopic}}") + .setHeader("CamelAwsS3Key",simple("${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}")) + .to("aws2-s3:camel-kafka-connector"); + } +} diff --git a/examples/kafka-source-s3/data/application.properties b/examples/kafka-source-s3/data/application.properties new file mode 100644 index 0000000..cea7485 --- /dev/null +++ b/examples/kafka-source-s3/data/application.properties @@ -0,0 +1,44 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# quarkus +# +quarkus.banner.enabled = false +quarkus.log.level = INFO +quarkus.log.category."org.apache.camel".level = INFO + +# +# camel - main +# +camel.main.name = camel-q +camel.main.stream-caching-enabled = true +camel.main.stream-caching-spool-directory = ${java.io.tmpdir}/camel-q + +# +# camel - aws2-s3 +# +camel.component.aws2-s3.accessKey=xxxx +camel.component.aws2-s3.secretKey=yyyy +camel.component.aws2-s3.region=region + +# +# camel - kafka +# +kafkatopic=testtopic +camel.component.kafka.brokers=localhost:9092 + diff --git a/examples/kafka-source-s3/pom.xml b/examples/kafka-source-s3/pom.xml new file mode 100644 index 0000000..57a9e77 --- /dev/null +++ b/examples/kafka-source-s3/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.camel.k</groupId> + <artifactId>camel-k-runtime-examples</artifactId> + <version>1.7.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>camel-k-runtime-example-java-kafka-s3</artifactId> + + <properties> + <noDeps>true</noDeps> + <quarkus.camel.main.routes-discovery.enabled>false</quarkus.camel.main.routes-discovery.enabled> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel.k</groupId> + <artifactId>camel-k-runtime</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.k</groupId> + <artifactId>camel-k-loader-java</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-aws2-s3</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-bootstrap-maven-plugin</artifactId> + <version>${quarkus-version}</version> + </plugin> + <plugin> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <version>${quarkus-version}</version> + <executions> + <execution> + <id>build</id> + <goals> + <goal>build</goal> + </goals> + <configuration> + <finalName>${project.artifactId}</finalName> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${exec-maven-plugin-version}</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <workingDirectory>${project.basedir}</workingDirectory> + <arguments> + <argument>-jar</argument> + <argument>${project.build.directory}/${project.artifactId}-runner.jar</argument> + </arguments> + <environmentVariables> + <CAMEL_K_CONF>${project.basedir}/data/application.properties</CAMEL_K_CONF> + <CAMEL_K_ROUTES>file:${project.basedir}/data/MyRoutes.java</CAMEL_K_ROUTES> + </environmentVariables> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/examples/kafka-source-s3/src/main/resources/application.properties b/examples/kafka-source-s3/src/main/resources/application.properties new file mode 100644 index 0000000..fa7a54b --- /dev/null +++ b/examples/kafka-source-s3/src/main/resources/application.properties @@ -0,0 +1,16 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 9e000a0..624a698 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -40,6 +40,7 @@ <module>kotlin</module> <module>groovy</module> <module>java</module> + <module>kafka-source-s3</module> <module>xml</module> <module>cron</module> </modules>