This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2f86fcb8234a CAMEL-23789: Improve camel-kafka docs with multi-DSL tabs
(#24096)
2f86fcb8234a is described below
commit 2f86fcb8234a13c00a45832b377f1752af1a2772
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 18 11:03:03 2026 +0200
CAMEL-23789: Improve camel-kafka docs with multi-DSL tabs (#24096)
CAMEL-23789: Improve camel-kafka docs with multi-DSL tabs and Java-only
markers
Signed-off-by: Claus Ibsen <[email protected]>
Co-authored-by: Claude <[email protected]>
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 608 ++++++++++++++++-----
1 file changed, 467 insertions(+), 141 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index cbc11a29d825..e4a2d1e3e3a5 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -83,14 +83,23 @@ How Camel handles a message that results in an exception
can be altered using th
Instead of continuing to poll the next message, Camel will instead commit the
offset so that the message that caused the exception will be retried.
This is similar to the *RETRY* polling strategy above.
+You can configure this on the component level, either programmatically in
Java, or via configuration properties:
+
+._Java-only: programmatic component configuration_
[source,java]
----
KafkaComponent kafka = new KafkaComponent();
kafka.setBreakOnFirstError(true);
-...
camelContext.addComponent("kafka", kafka);
----
+Or using configuration properties:
+
+[source,properties]
+----
+camel.component.kafka.break-on-first-error=true
+----
+
It is recommended that you read the section below "Using manual commit with
Kafka consumer" to understand how `breakOnFirstError`
will work based on the `CommitManager` that is configured.
@@ -122,23 +131,18 @@ To use, this repository must be placed in the Camel
registry, either manually or
Sample usage is as follows:
+NOTE: The `KafkaIdempotentRepository` bean must be registered in the Camel
registry before it can be referenced by the route.
+
+._Java-only: registering the bean_
[source,java]
----
KafkaIdempotentRepository kafkaIdempotentRepository = new
KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
-registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be
registered in the registry, to enable access to the CamelContext
-CamelContext context = new CamelContext(registry);
-
-// later in RouteBuilder...
-from("direct:performInsert")
- .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo")
- // once-only insert into the database
- .end()
+registry.put("insertDbIdemRepo", kafkaIdempotentRepository);
----
-In XML:
-
+._XML: registering the bean_
[source,xml]
----
<!-- simple -->
@@ -166,18 +170,63 @@ In XML:
</bean>
----
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("direct:performInsert")
+ .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo")
+ .to("sql:INSERT INTO ...")
+ .end();
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+ <from uri="direct:performInsert"/>
+ <idempotentConsumer idempotentRepository="insertDbIdemRepo">
+ <header>id</header>
+ <to uri="sql:INSERT INTO ..."/>
+ </idempotentConsumer>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: direct:performInsert
+ steps:
+ - idempotentConsumer:
+ idempotentRepository: "#insertDbIdemRepo"
+ expression:
+ header: id
+ steps:
+ - to:
+ uri: "sql:INSERT INTO ..."
+----
+====
+
There are 3 alternatives to choose from when using idempotency with numeric
identifiers. The first one is to use the static method `numericHeader` method
from `org.apache.camel.component.kafka.serde.KafkaSerdeHelper` to perform the
conversion for you:
+._Java-only: numericHeader helper method_
[source,java]
----
from("direct:performInsert")
.idempotentConsumer(numericHeader("id")).idempotentRepository("insertDbIdemRepo")
- // once-only insert into the database
- .end()
+ .to("sql:INSERT INTO ...")
+ .end();
----
Alternatively, it is possible to use a custom serializer configured via the
route URL to perform the conversion:
+._Java-only: custom header deserializer class_
[source,java]
----
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
@@ -198,18 +247,18 @@ public class CustomHeaderDeserializer extends
DefaultKafkaHeaderDeserializer {
Lastly, it is also possible to do so in a processor:
+._Java-only: processor for header type conversion_
[source,java]
----
-from(from).routeId("foo")
+from("kafka:my-topic")
.process(exchange -> {
byte[] id = exchange.getIn().getHeader("id", byte[].class);
-
BigInteger bi = new BigInteger(id);
exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
})
.idempotentConsumer(header("id"))
.idempotentRepository("kafkaIdempotentRepository")
- .to(to);
+ .to("direct:process");
----
=== Manual commits with the Kafka consumer
@@ -220,27 +269,38 @@ In case you want to force manual commits, you can use
`KafkaManualCommit` API fr
This requires turning on manual commits by either setting the option
`allowManualCommit` to `true` on the `KafkaComponent`
or on the endpoint, for example:
+._Java-only: programmatic component configuration_
[source,java]
----
KafkaComponent kafka = new KafkaComponent();
kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
-// ...
camelContext.addComponent("kafka", kafka);
----
+Or using configuration properties:
+
+[source,properties]
+----
+camel.component.kafka.auto-commit-enable=false
+camel.component.kafka.allow-manual-commit=true
+----
+
By default, it uses the `NoopCommitManager` behind the scenes. To commit an
offset, you will
-require you to use the `KafkaManualCommit` from Java code such as a Camel
`Processor`:
+need to use the `KafkaManualCommit` from Java code such as a Camel `Processor`:
+._Java-only: manual commit processor_
[source,java]
----
public void process(Exchange exchange) {
KafkaManualCommit manual =
- exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
+ exchange.getIn().getHeader("CamelKafkaManualCommit",
KafkaManualCommit.class);
manual.commit();
}
----
+TIP: The header name `CamelKafkaManualCommit` is also available as the
constant `KafkaConstants.MANUAL_COMMIT`.
+
The `KafkaManualCommit` will force a synchronous commit which will block until
the commit is acknowledged on Kafka, or if it fails an exception is thrown.
You can use an asynchronous commit as well by configuring the
`KafkaManualCommitFactory` with the `DefaultKafkaManualAsyncCommitFactory`
implementation.
@@ -252,6 +312,7 @@ on the `KafkaComponent` that creates instances of your
custom implementation.
When configuring a consumer to use manual commit and a specific
`CommitManager` it is important to understand how these influence the behavior
of `breakOnFirstError`
+._Java-only: programmatic component configuration_
[source,java]
----
KafkaComponent kafka = new KafkaComponent();
@@ -259,10 +320,19 @@ kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
kafka.setBreakOnFirstError(true);
kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory());
-...
camelContext.addComponent("kafka", kafka);
----
+Or using configuration properties:
+
+[source,properties]
+----
+camel.component.kafka.auto-commit-enable=false
+camel.component.kafka.allow-manual-commit=true
+camel.component.kafka.break-on-first-error=true
+camel.component.kafka.kafka-manual-commit-factory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory
+----
+
When the `CommitManager` is left to the default `NoopCommitManager` then
`breakOnFirstError` will not automatically commit the offset so that the
message with an error is retried. The consumer must manage that in the route
using `KafkaManualCommit`.
@@ -281,13 +351,14 @@ exception with the message `KafkaConsumer is not safe for
multi-threaded access`
The Kafka component supports pausable consumers. This type of consumer can
pause consuming data based on
conditions external to the component itself, such as an external system being
unavailable or other transient conditions.
+._Java-only: pausable consumer with lambda_
[source,java]
----
from("kafka:topic")
- .pausable(new KafkaConsumerListener(), () -> canContinue()) // the
pausable check gets called if the exchange fails to be processed ...
+ .pausable(new KafkaConsumerListener(), () -> canContinue())
.routeId("pausable-route")
- .process(this::process) // Kafka consumer will be paused if this one
throws an exception ...
- .to("some:destination"); // or this one
+ .process(this::process)
+ .to("some:destination");
----
In this example, consuming messages can pause (by calling the Kafka's Consumer
pause method) if the result from `canContinue` is false.
@@ -312,24 +383,84 @@ The following header value types are supported: `String`,
`Integer`, `Long`, `Do
Note: all headers propagated *from* kafka *to* camel exchange will contain
`byte[]` value by default.
To override default functionality, these uri parameters can be set:
`headerDeserializer` for `from` route and `headerSerializer` for `to` route.
For example:
+[tabs]
+====
+Java::
++
[source,java]
----
from("kafka:my_topic?headerDeserializer=#myDeserializer")
-...
-.to("kafka:my_topic?headerSerializer=#mySerializer")
+ .to("kafka:my_topic?headerSerializer=#mySerializer");
----
+XML::
++
+[source,xml]
+----
+<route>
+ <from uri="kafka:my_topic?headerDeserializer=#myDeserializer"/>
+ <to uri="kafka:my_topic?headerSerializer=#mySerializer"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: kafka:my_topic
+ parameters:
+ headerDeserializer: "#myDeserializer"
+ steps:
+ - to:
+ uri: kafka:my_topic
+ parameters:
+ headerSerializer: "#mySerializer"
+----
+====
+
By default, all headers are being filtered by `KafkaHeaderFilterStrategy`.
Strategy filters out headers which start with `Camel` or `org.apache.camel`
prefixes.
Default strategy can be overridden by using `headerFilterStrategy` uri
parameter in both `to` and `from` routes:
+[tabs]
+====
+Java::
++
[source,java]
----
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
-...
-.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
+ .to("kafka:my_topic?headerFilterStrategy=#myStrategy");
----
+XML::
++
+[source,xml]
+----
+<route>
+ <from uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/>
+ <to uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: kafka:my_topic
+ parameters:
+ headerFilterStrategy: "#myStrategy"
+ steps:
+ - to:
+ uri: kafka:my_topic
+ parameters:
+ headerFilterStrategy: "#myStrategy"
+----
+====
+
`myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be
placed in the Camel registry, either manually or by registration as a bean in
Spring, as it is `CamelContext` aware.
=== Kafka Transaction
@@ -383,6 +514,7 @@ If both `transacted=true` and `transactionalId` are
present, the latter takes pr
Configure the 'krb5.conf' file directly through the API:
+._Java-only: static Kerberos configuration_
[source,java]
----
static {
@@ -390,6 +522,13 @@ static {
}
----
+Alternatively, you can set the JVM system property:
+
+[source,properties]
+----
+java.security.krb5.conf=/path/to/config/file
+----
+
=== Authentication to Kafka
Kafka supports several ways to authenticate the clients to the server,
including plain text, PKI (certificates) over TLS, you can refer to the
https://kafka.apache.org/documentation/#security_sasl[Kafka documentation] for
a detailed view of the supported mechanisms. The kafka authentication and
authorization is based on JAAS, so you must use a JAAS Login Module
implementation on the client side.
@@ -464,16 +603,45 @@ camel.component.kafka.sasl-password=mypassword
*OAuth Authentication Example*
+[tabs]
+====
+Java::
++
[source,java]
----
-from("kafka:my-topic?brokers=localhost:9092" +
- "&saslAuthType=OAUTH" +
- "&oauthClientId=my-client" +
- "&oauthClientSecret=my-secret" +
- "&oauthTokenEndpointUri=https://auth.example.com/oauth/token")
+from("kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token")
.to("log:received");
----
+XML::
++
+[source,xml]
+----
+<route>
+ <from
uri="kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token"/>
+ <to uri="log:received"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: kafka:my-topic
+ parameters:
+ brokers: "localhost:9092"
+ saslAuthType: OAUTH
+ oauthClientId: my-client
+ oauthClientSecret: my-secret
+ oauthTokenEndpointUri: "https://auth.example.com/oauth/token"
+ steps:
+ - to:
+ uri: log:received
+----
+====
+
*AWS MSK IAM Authentication*
When using AWS MSK with IAM authentication, ensure the `aws-msk-iam-auth`
library is on the classpath:
@@ -600,26 +768,23 @@ By default, Camel uses automatic commits when using batch
processing. In this ca
In case of failures, the records will not be processed.
The code below provides an example of this approach:
+
+._Java-only: batch processing with inline processor_
[source,java]
----
-public void configure() {
-
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e
-> {
- // The received records are stored as exchanges in a list. This gets
the list of those exchanges
+from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest")
+ .process(e -> {
final List<?> exchanges = e.getMessage().getBody(List.class);
-
- // Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
-
- // The records from the batch are stored in a list of exchanges in the
original exchange. To process, we iterate over that list
for (Object obj : exchanges) {
if (obj instanceof Exchange exchange) {
LOG.info("Processing exchange with body {}",
exchange.getMessage().getBody(String.class));
}
}
- }).to(KafkaTestUtil.MOCK_RESULT);
-}
+ })
+ .to("mock:result");
----
===== Handling Errors with Automatic Commits
@@ -630,44 +795,31 @@ It is recommended to implement appropriate error handling
mechanisms and pattern
The code below provides an example of handling errors with automatic commits:
+._Java-only: error handling with batch processing_
[source,java]
----
-public void configure() {
- /*
- We want to use continued here, so that Camel auto-commits the batch even
though part of it has failed. In a
- production scenario, applications should probably send these records to a
separate topic or fix the condition
- that lead to the failure
- */
- onException(IllegalArgumentException.class).process(exchange -> {
- LOG.warn("Failed to process batch {}",
exchange.getMessage().getBody());
- LOG.warn("Failed to process due to {}",
exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
- }).continued(true);
-
-
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e
-> {
- // The received records are stored as exchanges in a list. This gets
the list of those exchanges
- final List<?> exchanges = e.getMessage().getBody(List.class);
+onException(IllegalArgumentException.class).process(exchange -> {
+ LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
+ LOG.warn("Failed to process due to {}",
exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
+}).continued(true);
- // Ensure we are actually receiving what we are asking for
+from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest")
+ .process(e -> {
+ final List<?> exchanges = e.getMessage().getBody(List.class);
if (exchanges == null || exchanges.isEmpty()) {
return;
}
-
- // The records from the batch are stored in a list of exchanges in the
original exchange.
- int i = 0;
for (Object o : exchanges) {
if (o instanceof Exchange exchange) {
- i++;
LOG.info("Processing exchange with body {}",
exchange.getMessage().getBody(String.class));
-
- if (i == 4) {
- throw new IllegalArgumentException("Failed to process
record");
- }
}
}
- }).to(KafkaTestUtil.MOCK_RESULT);
-}
+ })
+ .to("mock:result");
----
+TIP: In a production scenario, applications should send failed records to a
separate topic (dead-letter queue) or fix the condition that led to the failure.
+
===== Break on First Error in Batching Mode
The `breakOnFirstError` option is also supported in batching mode, providing
the same error handling behavior as in streaming mode but applied to batch
processing.
@@ -775,7 +927,7 @@ onException(Exception.class)
.process(exchange -> {
// Commit manually when error occurs
KafkaManualCommit manual = exchange.getMessage()
- .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
manual.commit();
});
@@ -790,7 +942,7 @@
from("kafka:topic?groupId=myGroup&batching=true&breakOnFirstError=true&autoCommi
.process(exchange -> {
// Manual commit on successful processing
KafkaManualCommit manual = exchange.getMessage()
- .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
manual.commit();
})
.to("mock:result");
@@ -870,7 +1022,7 @@ public class ErrorCommitProcessor implements Processor {
LOG.warn("Error occurred, performing manual commit before
reconnection");
KafkaManualCommit manual = exchange.getMessage()
- .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
if (manual != null) {
manual.commit();
@@ -888,7 +1040,7 @@ public class SuccessCommitProcessor implements Processor {
LOG.debug("Batch processed successfully, performing manual commit");
KafkaManualCommit manual = exchange.getMessage()
- .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
if (manual != null) {
manual.commit();
@@ -905,33 +1057,25 @@ When working with batch processing with manual commits,
it's up to the applicati
The code below provides an example of this approach:
+._Java-only: batch processing with manual commits_
[source,java]
----
-public void configure() {
-
from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
.process(e -> {
- // The received records are stored as exchanges in a list. This gets
the list of those exchanges
final List<?> exchanges = e.getMessage().getBody(List.class);
-
- // Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
-
- /*
- Every exchange in that list should contain a reference to the manual
commit object. We use the reference
- for the last exchange in the list to commit the whole batch
- */
+ // Use the last exchange in the list to commit the whole batch
final Object tmp = exchanges.getLast();
if (tmp instanceof Exchange exchange) {
KafkaManualCommit manual =
-
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
+ exchange.getMessage().getHeader("CamelKafkaManualCommit",
KafkaManualCommit.class);
LOG.debug("Performing manual commit");
manual.commit();
LOG.debug("Done performing manual commit");
}
});
-}
----
==== Dealing with long polling timeouts
@@ -942,14 +1086,17 @@ To properly do so, first make sure to have a max polling
interval that is higher
Then, increase the shutdown timeout to ensure that committing, closing and
other Kafka operations are not abruptly aborted. For instance:
+._Java-only: programmatic shutdown timeout configuration_
[source,java]
----
-public void configure() {
- // Note that this can be configured in other ways
- getCamelContext().getShutdownStrategy().setTimeout(10000);
+getCamelContext().getShutdownStrategy().setTimeout(10000);
+----
- // route setup ...
-}
+Or using configuration properties:
+
+[source,properties]
+----
+camel.main.shutdown-timeout=10000
----
=== Custom Subscription Adapters
@@ -957,8 +1104,8 @@ public void configure() {
Applications with complex subscription logic may provide a custom bean to
handle the subscription process. To so, it is
necessary to implement the interface `SubscribeAdapter`.
+._Java-only: custom subscribe adapter class_
[source,java]
-.Example subscriber adapter that subscribes to a set of Kafka topics or
patterns
----
public class CustomSubscribeAdapter implements SubscribeAdapter {
@Override
@@ -972,12 +1119,12 @@ public class CustomSubscribeAdapter implements
SubscribeAdapter {
}
----
-Then, it is necessary to add it as named bean instance to the registry:
+Then, it is necessary to add it as a named bean instance to the registry:
+._Java-only: registering the bean_
[source,java]
-.Add to registry example
----
-context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new
CustomSubscribeAdapter());
+context.getRegistry().bind("subscribeAdapter", new CustomSubscribeAdapter());
----
=== Interoperability
@@ -1002,25 +1149,90 @@ To utilize this solution, you need to modify the route
URI on the consumer end o
`headerDeserializer` option.
For example:
+[tabs]
+====
+Java::
++
[source,java]
-.Route snippet
----
from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer")
- .to("...");
+ .to("direct:process");
----
+XML::
++
+[source,xml]
+----
+<route>
+ <from
uri="kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"/>
+ <to uri="direct:process"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: kafka:topic
+ parameters:
+ headerDeserializer:
"#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"
+ steps:
+ - to:
+ uri: direct:process
+----
+====
+
=== Producer Performance
If the producer is performing too slowly for your needs, you may want to
aggregate the exchanges before sending.
+[tabs]
+====
+Java::
++
[source,java]
-.Route snippet
----
-from("source")
- // .other route stuff
+from("direct:start")
.aggregate(constant(true), new GroupedExchangeAggregationStrategy())
- .to("kafka:topic");
+ .to("kafka:my-topic");
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+ <from uri="direct:start"/>
+ <aggregate aggregationStrategy="#groupedExchange">
+ <correlationExpression>
+ <constant>true</constant>
+ </correlationExpression>
+ <to uri="kafka:my-topic"/>
+ </aggregate>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: direct:start
+ steps:
+ - aggregate:
+ aggregationStrategy: "#groupedExchange"
+ correlationExpression:
+ constant: "true"
+ steps:
+ - to:
+ uri: kafka:my-topic
----
+====
+
+TIP: In XML and YAML, register the `GroupedExchangeAggregationStrategy` as a
bean named `groupedExchange` in the registry.
The reason for this is related to how the producer handles the two different
cases:
@@ -1183,68 +1395,162 @@ To keep the offsets, the component needs a
`StateRepository` implementation such
This bean should be available in the registry.
Here how to use it :
+NOTE: The `FileStateRepository` bean must be registered in the Camel registry
before it can be referenced by the route.
+
+._Java-only: registering the bean_
[source,java]
----
-// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new
File("/path/to/repo.dat"));
// Bind this repository into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);
+----
-// Configure the camel context
-DefaultCamelContext camelContext = new DefaultCamelContext(registry);
-camelContext.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- fromF("kafka:%s?brokers=localhost:{{kafkaPort}}" +
- // Set up the topic and broker address
- "&groupId=A" +
- // The consumer processor group ID
- "&autoOffsetReset=earliest" +
- // Ask to start from the beginning if we have unknown
offset
- "&offsetRepository=#offsetRepo", TOPIC)
- // Keep the offsets in the previously configured
repository
- .to("mock:result");
- }
-});
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("kafka:my-topic?brokers=localhost:9092&groupId=A&autoOffsetReset=earliest&offsetRepository=#offsetRepo")
+ .to("mock:result");
----
+XML::
++
+[source,xml]
+----
+<route>
+ <from
uri="kafka:my-topic?brokers=localhost:9092&groupId=A&autoOffsetReset=earliest&offsetRepository=#offsetRepo"/>
+ <to uri="mock:result"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: kafka:my-topic
+ parameters:
+ brokers: "localhost:9092"
+ groupId: A
+ autoOffsetReset: earliest
+ offsetRepository: "#offsetRepo"
+ steps:
+ - to:
+ uri: mock:result
+----
+====
+
=== Producing messages to Kafka
Here is the minimal route you need to produce messages to Kafka.
+[tabs]
+====
+Java::
++
[source,java]
----
from("direct:start")
- .setBody(constant("Message from Camel")) // Message to send
- .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
+ .setBody(constant("Message from Camel"))
+ .setHeader("CamelKafkaKey", constant("Camel"))
.to("kafka:test?brokers=localhost:9092");
----
+XML::
++
+[source,xml]
+----
+<route>
+ <from uri="direct:start"/>
+ <setBody>
+ <constant>Message from Camel</constant>
+ </setBody>
+ <setHeader name="CamelKafkaKey">
+ <constant>Camel</constant>
+ </setHeader>
+ <to uri="kafka:test?brokers=localhost:9092"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: direct:start
+ steps:
+ - setBody:
+ constant: "Message from Camel"
+ - setHeader:
+ name: CamelKafkaKey
+ constant: "Camel"
+ - to:
+ uri: kafka:test
+ parameters:
+ brokers: "localhost:9092"
+----
+====
+
+TIP: In Java, you can use the constant `KafkaConstants.KEY` instead of the
string `"CamelKafkaKey"`.
+
=== SSL configuration
You have two different ways to configure the SSL communication on the Kafka
component.
The first way is through the many SSL endpoint parameters:
+[tabs]
+====
+Java::
++
[source,java]
----
-from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
- "&groupId=A" +
- "&sslKeystoreLocation=/path/to/keystore.jks" +
- "&sslKeystorePassword=changeit" +
- "&sslKeyPassword=changeit" +
- "&securityProtocol=SSL")
- .to("mock:result");
+from("kafka:my-topic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL")
+ .to("mock:result");
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+ <from
uri="kafka:my-topic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL"/>
+ <to uri="mock:result"/>
+</route>
----
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: kafka:my-topic
+ parameters:
+ brokers: "localhost:9092"
+ groupId: A
+ sslKeystoreLocation: "/path/to/keystore.jks"
+ sslKeystorePassword: changeit
+ sslKeyPassword: changeit
+ securityProtocol: SSL
+ steps:
+ - to:
+ uri: mock:result
+----
+====
+
The second way is to use the `sslContextParameters` endpoint parameter:
+._Java-only: SSLContextParameters bean setup_
[source,java]
----
-// Configure the SSLContextParameters object
KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource("/path/to/keystore.jks");
ksp.setPassword("changeit");
@@ -1254,26 +1560,46 @@ kmp.setKeyPassword("changeit");
SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);
-// Bind this SSLContextParameters into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("ssl", scp);
+----
-// Configure the camel context
-DefaultCamelContext camelContext = new DefaultCamelContext(registry);
-camelContext.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
- // Set up the topic and broker address
- "&groupId=A" +
- // The consumer processor group ID
- "&sslContextParameters=#ssl" +
- // The security protocol
- "&securityProtocol=SSL)
- // Reference the SSL configuration
- .to("mock:result");
- }
-});
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("kafka:my-topic?brokers=localhost:9092&groupId=A&sslContextParameters=#ssl&securityProtocol=SSL")
+ .to("mock:result");
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+ <from
uri="kafka:my-topic?brokers=localhost:9092&groupId=A&sslContextParameters=#ssl&securityProtocol=SSL"/>
+ <to uri="mock:result"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+ from:
+ uri: kafka:my-topic
+ parameters:
+ brokers: "localhost:9092"
+ groupId: A
+ sslContextParameters: "#ssl"
+ securityProtocol: SSL
+ steps:
+ - to:
+ uri: mock:result
----
+====
include::spring-boot:partial$starter.adoc[]