nathanhagemann commented on issue #336:
URL: 
https://github.com/apache/camel-kafka-connector-examples/issues/336#issuecomment-1086959221

   ah, i don't have kafkacat. I did have some luck with a regular camel route.
   
   pom.xml =====================================start
   <?xml version="1.0" encoding="UTF-8"?>
   <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
        <modelVersion>4.0.0</modelVersion>
        <parent>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>2.6.6</version>
                <relativePath /> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.camel.route</groupId>
        <artifactId>kafka-to-cassandra-example</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka-to-cassandra-example</name>
        <description>Demo project for Spring Boot</description>
        <properties>
                <java.version>1.8</java.version>
                <camel.version>3.14.0</camel.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-devtools</artifactId>
                        <scope>runtime</scope>
                        <optional>true</optional>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.apache.camel.springboot</groupId>
                        <artifactId>camel-spring-boot-starter</artifactId>
                        <version>${camel.version}</version>
                </dependency>
   
                <dependency>
                        <groupId>org.apache.camel.springboot</groupId>
                        <artifactId>camel-jackson-starter</artifactId>
                        <version>${camel.version}</version>
                </dependency>
   
                <dependency>
                        <groupId>org.apache.camel.springboot</groupId>
                        <artifactId>camel-kafka-starter</artifactId>
                        <version>${camel.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.camel.springboot</groupId>
                        <artifactId>camel-cassandraql-starter</artifactId>
                        <version>${camel.version}</version>
                </dependency>
        </dependencies>
        <build>
                <plugins>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                
<artifactId>spring-boot-maven-plugin</artifactId>
                        </plugin>
                </plugins>
        </build>
   </project>
   pom.xml =====================================end
   
   application.properties=============================start
   # Kafka Camel configuration
   logging.level.org.springframework: DEBUG
   camel.component.kafka.groupId=consumer1
   
camel.component.kafka.brokers=10.188.5.86:9092,10.188.5.86:9093,10.188.5.86:9094
   
   # Cassandra
   cassandra.username=cameldevloader
   cassandra.password=newpassword
   cassandra.ip=10.66.16.10
   cassandra.port=9042
   cassandra.keyspace=dev
   application.properties=============================end
   
   
   PieRoute.java===================================start
   package com.camel.route.kafkatocassandraexample.route;
   
   import org.apache.camel.Exchange;
   import org.apache.camel.Processor;
   import org.apache.camel.builder.RouteBuilder;
   import org.apache.camel.model.dataformat.JsonLibrary;
   import org.springframework.stereotype.Component;
   
   @Component
   public class PieRoute extends RouteBuilder{
        
        private String cql = "INSERT INTO pie (\r\n"
                        + "pieid,\r\n"
                        + "pietype,\r\n"
                        + "pieinvented"
                        + ") VALUES (\r\n"
                        + "now(),\r\n"
                        + "'${body[pietype]}',\r\n"
                        + "${body[pieinvented]});";
        
        @Override
        public void configure() throws Exception {
                //final String mysql = "insert into car (carnm) values 
('Lexus');";
                from("kafka:pie"
                                + "?brokers={{camel.component.kafka.brokers}}"
                                + "&groupId={{camel.component.kafka.groupId}}"
                                + "&seekTo=beginning"
                                )
                
                .unmarshal().json(JsonLibrary.Jackson)
                .setHeader("CamelCqlQuery", simple(cql))
                .setBody().simple("${null}")
                .process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                        String mycql = 
exchange.getMessage().getHeader("CamelCqlQuery").toString();
                        // if pieinvented is not provided, add null keyword to 
insert statement
                        mycql = mycql.replace("\r\n)","\r\nnull)");
                        exchange.getIn().setHeader("CamelCqlQuery", mycql);
                    //System.out.println(mycql);
                }
             })
                .to("cql://{{cassandra.ip}}/{{cassandra.keyspace}}"
                                + "?username={{cassandra.username}}"
                                + "&password={{cassandra.password}}");
        }
   }
   
   PieRoute.java===================================end
   
   <img width="1040" alt="kafka-pie" 
src="https://user-images.githubusercontent.com/44320042/161450812-78c639da-ee60-4384-90d5-23bf0a264c30.png";>
   <img width="585" alt="cassandra-pie" 
src="https://user-images.githubusercontent.com/44320042/161450816-86b0c3f6-999e-43b5-bced-2beeb2b06a12.png";>
   
   
   Hope this helps someone.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to