nathanhagemann commented on issue #336: URL: https://github.com/apache/camel-kafka-connector-examples/issues/336#issuecomment-1086959221
ah, i don't have kafkacat. I did have some luck with a regular camel route. pom.xml =====================================start <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.6</version> <relativePath /> <!-- lookup parent from repository --> </parent> <groupId>com.camel.route</groupId> <artifactId>kafka-to-cassandra-example</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka-to-cassandra-example</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <camel.version>3.14.0</camel.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel.springboot</groupId> <artifactId>camel-spring-boot-starter</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel.springboot</groupId> <artifactId>camel-jackson-starter</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel.springboot</groupId> <artifactId>camel-kafka-starter</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel.springboot</groupId> <artifactId>camel-cassandraql-starter</artifactId> <version>${camel.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> pom.xml =====================================end application.properties=============================start # Kafka Camel configuration logging.level.org.springframework: DEBUG camel.component.kafka.groupId=consumer1 camel.component.kafka.brokers=10.188.5.86:9092,10.188.5.86:9093,10.188.5.86:9094 # Cassandra cassandra.username=cameldevloader cassandra.password=newpassword cassandra.ip=10.66.16.10 cassandra.port=9042 cassandra.keyspace=dev application.properties=============================end PieRoute.java===================================start package com.camel.route.kafkatocassandraexample.route; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.dataformat.JsonLibrary; import org.springframework.stereotype.Component; @Component public class PieRoute extends RouteBuilder{ private String cql = "INSERT INTO pie (\r\n" + "pieid,\r\n" + "pietype,\r\n" + "pieinvented" + ") VALUES (\r\n" + "now(),\r\n" + "'${body[pietype]}',\r\n" + "${body[pieinvented]});"; @Override public void configure() throws Exception { //final String mysql = "insert into car (carnm) values ('Lexus');"; from("kafka:pie" + "?brokers={{camel.component.kafka.brokers}}" + "&groupId={{camel.component.kafka.groupId}}" + "&seekTo=beginning" ) .unmarshal().json(JsonLibrary.Jackson) .setHeader("CamelCqlQuery", simple(cql)) .setBody().simple("${null}") .process(new Processor() { public void process(Exchange exchange) throws Exception { String mycql = exchange.getMessage().getHeader("CamelCqlQuery").toString(); // if pieinvented is not provided, add null keyword to insert statement mycql = mycql.replace("\r\n)","\r\nnull)"); exchange.getIn().setHeader("CamelCqlQuery", mycql); //System.out.println(mycql); } }) .to("cql://{{cassandra.ip}}/{{cassandra.keyspace}}" + "?username={{cassandra.username}}" + "&password={{cassandra.password}}"); } } PieRoute.java===================================end <img width="1040" alt="kafka-pie" src="https://user-images.githubusercontent.com/44320042/161450812-78c639da-ee60-4384-90d5-23bf0a264c30.png"> <img width="585" alt="cassandra-pie" src="https://user-images.githubusercontent.com/44320042/161450816-86b0c3f6-999e-43b5-bced-2beeb2b06a12.png"> Hope this helps someone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org