This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 3bcad339dec CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005) 3bcad339dec is described below commit 3bcad339dec96fc5928afc95751453f746a705cb Author: Jono Morris <j...@apache.org> AuthorDate: Tue Feb 6 02:20:31 2024 +1300 CAMEL-20382: camel-kafka - RecordMetadata header naming convention (#13005) * CAMEL-20382: camel-kafka - follow naming convention for RecordMetadata header * CAMEL-20382: use headername kafka.RECORD_META * CAMEL-20382: tweek upgrade guide * CAMEL-20382: mention that the header constant has changed --- .../resources/org/apache/camel/catalog/components/kafka.json | 2 +- .../resources/org/apache/camel/component/kafka/kafka.json | 2 +- .../java/org/apache/camel/component/kafka/KafkaConstants.java | 2 +- .../java/org/apache/camel/component/kafka/KafkaProducer.java | 2 +- .../camel/component/kafka/producer/support/ProducerUtil.java | 4 ++-- .../org/apache/camel/component/kafka/KafkaProducerTest.java | 10 +++++----- .../camel/component/kafka/integration/KafkaProducerFullIT.java | 10 +++++----- .../camel/component/kafka/integration/KafkaTransactionIT.java | 2 +- .../modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc | 6 ++++++ .../builder/endpoint/dsl/KafkaEndpointBuilderFactory.java | 7 +++---- 10 files changed, 26 insertions(+), 21 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index ae4af361362..f1a6c6e0dd2 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -150,7 +150,7 @@ "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Indicates the last record within the current poll request (only available if autoCommitEnable endpoint parameter is false or allowManualCommit is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...] "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" }, "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", "constantName": "org.apache.camel.componen [...] - "org.apache.kafka.clients.producer.RecordMetadata": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORDMETA" }, + "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" }, "CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used for forcing manual offset commit when using Kafka consumer.", "constantName": "org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" } }, "properties": { diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index ae4af361362..f1a6c6e0dd2 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -150,7 +150,7 @@ "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Indicates the last record within the current poll request (only available if autoCommitEnable endpoint parameter is false or allowManualCommit is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...] "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" }, "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", "constantName": "org.apache.camel.componen [...] - "org.apache.kafka.clients.producer.RecordMetadata": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORDMETA" }, + "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if recordMetadata endpoint parameter is true)", "constantName": "org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" }, "CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used for forcing manual offset commit when using Kafka consumer.", "constantName": "org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" } }, "properties": { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 53d9afe57c0..150614ac7d9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -65,7 +65,7 @@ public final class KafkaConstants { @Metadata(label = "producer", description = "The metadata (only configured if `recordMetadata` endpoint parameter is `true`)", javaType = "List<RecordMetadata>") - public static final String KAFKA_RECORDMETA = "org.apache.kafka.clients.producer.RecordMetadata"; + public static final String KAFKA_RECORD_META = "kafka.RECORD_META"; @Metadata(label = "consumer", description = "Can be used for forcing manual offset commit when using Kafka consumer.", javaType = "org.apache.camel.component.kafka.consumer.KafkaManualCommit") public static final String MANUAL_COMMIT = "CamelKafkaManualCommit"; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 77d3c4450c2..76d68286e1d 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -416,7 +416,7 @@ public class KafkaProducer extends DefaultAsyncProducer { // This sets an empty metadata for the very first message on the batch List<RecordMetadata> recordMetadata = new ArrayList<>(); if (configuration.isRecordMetadata()) { - exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadata); + exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadata); } while (recordIterable.hasNext()) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java index ba15dc32b77..992726eec3a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java @@ -75,11 +75,11 @@ public final class ProducerUtil { public static void setRecordMetadata(Object body, List<RecordMetadata> recordMetadataList) { if (body instanceof Exchange) { Exchange ex = (Exchange) body; - ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList); + ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList); } if (body instanceof Message) { Message msg = (Message) body; - msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadataList); + msg.setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList); } } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 7966e8108cd..5d6dafe3ca9 100755 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -534,21 +534,21 @@ public class KafkaProducerTest { } private void assertRecordMetadataTimestampExists() { - List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA); + List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META); assertNotNull(recordMetaData1); assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData"); assertNotNull(recordMetaData1.get(0)); } private void assertRecordMetadataExists() { - List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA); + List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META); assertNotNull(recordMetaData1); assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData"); assertNotNull(recordMetaData1.get(0)); } private void assertRecordMetadataExists(final int numMetadata) { - List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA); + List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORD_META); assertNotNull(recordMetaData1); assertEquals(recordMetaData1.size(), numMetadata, "Expected one recordMetaData"); assertNotNull(recordMetaData1.get(0)); @@ -558,7 +558,7 @@ public class KafkaProducerTest { List<Exchange> exchanges = (List<Exchange>) in.getBody(); for (Exchange ex : exchanges) { List<RecordMetadata> recordMetaData - = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORDMETA); + = (List<RecordMetadata>) ex.getMessage().getHeader(KafkaConstants.KAFKA_RECORD_META); assertNotNull(recordMetaData); assertEquals(1, recordMetaData.size(), "Expected one recordMetaData"); assertNotNull(recordMetaData.get(0)); @@ -568,7 +568,7 @@ public class KafkaProducerTest { private void assertRecordMetadataExistsForEachAggregatedMessage() { List<Message> messages = (List<Message>) in.getBody(); for (Message msg : messages) { - List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORDMETA); + List<RecordMetadata> recordMetaData = (List<RecordMetadata>) msg.getHeader(KafkaConstants.KAFKA_RECORD_META); assertNotNull(recordMetaData); assertEquals(1, recordMetaData.size(), "Expected one recordMetaData"); assertNotNull(recordMetaData.get(0)); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java index 998b8f355b2..07e8bcd95ef 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java @@ -206,7 +206,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport { for (Exchange exchange : exchangeList) { @SuppressWarnings("unchecked") List<RecordMetadata> recordMetaData1 - = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA)); + = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META)); assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected."); assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive"); assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'"); @@ -238,7 +238,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport { for (Exchange exchange : exchangeList) { @SuppressWarnings("unchecked") List<RecordMetadata> recordMetaData1 - = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA)); + = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META)); assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected."); assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive"); assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'"); @@ -299,7 +299,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport { assertEquals(2, exchangeList.size(), "Two Exchanges are expected"); Exchange e1 = exchangeList.get(0); @SuppressWarnings("unchecked") - List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA)); + List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (e1.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META)); assertEquals(10, recordMetaData1.size(), "Ten RecordMetadata is expected."); for (RecordMetadata recordMeta : recordMetaData1) { assertTrue(recordMeta.offset() >= 0, "Offset is positive"); @@ -307,7 +307,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport { } Exchange e2 = exchangeList.get(1); @SuppressWarnings("unchecked") - List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) (e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA)); + List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) (e2.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META)); assertEquals(5, recordMetaData2.size(), "Five RecordMetadata is expected."); for (RecordMetadata recordMeta : recordMetaData2) { assertTrue(recordMeta.offset() >= 0, "Offset is positive"); @@ -346,7 +346,7 @@ public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport { for (Exchange exchange : exchangeList) { @SuppressWarnings("unchecked") List<RecordMetadata> recordMetaData1 - = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA)); + = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META)); assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected."); assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive"); assertTrue(recordMetaData1.get(0).topic().startsWith("test"), "Topic Name start with 'test'"); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java index f038dc0af85..51ac5f10cdf 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java @@ -163,7 +163,7 @@ public class KafkaTransactionIT extends BaseEmbeddedKafkaTestSupport { for (Exchange exchange : exchangeList) { @SuppressWarnings("unchecked") List<RecordMetadata> recordMetaData1 - = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA)); + = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORD_META)); assertEquals(1, recordMetaData1.size(), "One RecordMetadata is expected."); assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is positive"); assertTrue(recordMetaData1.get(0).topic().startsWith("transaction"), "Topic Name start with 'transaction'"); diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc index 63d3b3ad05a..c14c7e5f5bc 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc @@ -162,3 +162,9 @@ This option is only intended for the Load Balancer EIP. This makes the YAML sche === camel-hdfs The HDFS component has been deprecated, and planned to be removed in 4.4 (see CAMEL-20196). + +=== camel-kafka + +The header name for the `List<RecordMetadata>` metadata has changed from +`org.apache.kafka.clients.producer.RecordMetadata` to `kafka.RECORD_META`, +and the header constant from `KAFKA_RECORDMETA` to `KAFKA_RECORD_META`. \ No newline at end of file diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index cfdcf26558b..20c82a9b412 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -5338,11 +5338,10 @@ public interface KafkaEndpointBuilderFactory { * * Group: producer * - * @return the name of the header {@code - * org.apache.kafka.clients.producer.RecordMetadata}. + * @return the name of the header {@code kafka.RECORD_META}. */ - public String orgApacheKafkaClientsProducerRecordmetadata() { - return "org.apache.kafka.clients.producer.RecordMetadata"; + public String kafkaRecordMeta() { + return "kafka.RECORD_META"; } /**