This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-examples.git
The following commit(s) were added to refs/heads/master by this push: new fde3e52 Polished fde3e52 is described below commit fde3e529e394a0708aba12b15d85a5c257709cdf Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Mar 20 17:24:25 2020 +0100 Polished --- .../org/apache/camel/example/kafka/MessageConsumerClient.java | 8 +++++++- .../org/apache/camel/example/kafka/MessagePublisherClient.java | 7 ++++--- .../camel-example-kafka/src/main/resources/application.properties | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java index 69c5dff..347ed5b 100644 --- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java +++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java @@ -18,6 +18,7 @@ package org.apache.camel.example.kafka; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.component.ComponentsBuilderFactory; import org.apache.camel.impl.DefaultCamelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,12 @@ public final class MessageConsumerClient { log.info("About to start route: Kafka Server -> Log "); - from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + // setup kafka component with the brokers + ComponentsBuilderFactory.kafka() + .brokers("{{kafka.host}}:{{kafka.port}}") + .register(camelContext, "kafka"); + + from("kafka:{{consumer.topic}}" + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}" + "&seekTo={{consumer.seekTo}}" diff --git a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java index 0aabc64..62fcf4e 100644 --- a/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java +++ b/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java @@ -52,21 +52,22 @@ public final class MessagePublisherClient { // setup kafka component with the brokers ComponentsBuilderFactory.kafka() + .brokers("{{kafka.host}}:{{kafka.port}}") .register(camelContext, "kafka"); from("direct:kafkaStart").routeId("DirectToKafka") - .to("kafka:{{producer.topic}}?brokers={{kafka.host}}:{{kafka.port}}").log("${headers}"); + .to("kafka:{{producer.topic}}").log("${headers}"); // Topic can be set in header as well. from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic") - .to("kafka:dummy?brokers={{kafka.host}}:{{kafka.port}}") + .to("kafka:dummy") .log("${headers}"); // Use custom partitioner based on the key. from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner") - .to("kafka:{{producer.topic}}?partitioner={{producer.partitioner}}&brokers={{kafka.host}}:{{kafka.port}}") + .to("kafka:{{producer.topic}}?partitioner={{producer.partitioner}}") .log("${headers}"); diff --git a/examples/camel-example-kafka/src/main/resources/application.properties b/examples/camel-example-kafka/src/main/resources/application.properties index d05ed13..d07e809 100644 --- a/examples/camel-example-kafka/src/main/resources/application.properties +++ b/examples/camel-example-kafka/src/main/resources/application.properties @@ -36,5 +36,5 @@ consumer.maxPollRecords=5000 # No of consumers that connect to Kafka server consumer.consumersCount=1 -# Get records from the begining +# Get records from the beginning consumer.seekTo=beginning