This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 45022c44c5826d3c1f350d4d362e148e5adf9bd2 Author: Enrique Fernández <[email protected]> AuthorDate: Mon Mar 16 09:11:38 2026 +0100 [fix][io][kca] kafka headers silently dropped (#25325) --- .../kafka/connect/AbstractKafkaConnectSource.java | 5 ++-- .../io/kafka/connect/KafkaConnectSource.java | 15 +++++++++++ .../io/kafka/connect/KafkaConnectSourceTest.java | 29 ++++++++++++++++++++++ 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index e048ddf7244..6ed9e7891b2 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -288,7 +288,6 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { public abstract AbstractKafkaSourceRecord<T> processSourceRecord(SourceRecord srcRecord); - private static final Map<String, String> PROPERTIES = Collections.emptyMap(); private static final Optional<Long> RECORD_SEQUENCE = Optional.empty(); public abstract class AbstractKafkaSourceRecord<T> implements Record { @@ -331,9 +330,11 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { return RECORD_SEQUENCE; } + Map<String, String> properties = Collections.emptyMap(); + @Override public Map<String, String> getProperties() { - return PROPERTIES; + return properties; } public boolean isEmpty() { diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index 3d5d76d4230..e8a77b6de45 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -23,6 +23,7 @@ import com.google.common.cache.CacheBuilder; import io.confluent.connect.avro.AvroData; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -240,6 +241,20 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource<KeyValue<byte .map(e -> e.getKey() + "=" + e.getValue()) .collect(Collectors.joining(","))); this.partitionIndex = Optional.ofNullable(srcRecord.kafkaPartition()); + + // Propagate Kafka Connect record headers as Pulsar message properties. + if (srcRecord.headers() != null && !srcRecord.headers().isEmpty()) { + Map<String, String> headerProperties = new HashMap<>(); + for (var h : srcRecord.headers()) { + Object val = h.value(); + if (val != null) { + headerProperties.put(h.key(), val instanceof byte[] + ? Base64.getEncoder().encodeToString((byte[]) val) + : val.toString()); + } + } + this.properties = Collections.unmodifiableMap(headerProperties); + } } @Override diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index 5d414b6373c..8b264695905 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -300,6 +300,35 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase { runTransformTest(config, false); } + @Test + void testHeadersPropagatedAsProperties() throws Exception { + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, "default-tenant/default-ns"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + Map<String, Object> sourcePartition = new HashMap<>(); + Map<String, Object> sourceOffset = new HashMap<>(); + sourcePartition.put("test", "test"); + sourceOffset.put("test", 0); + SourceRecord srcRecord = new SourceRecord( + sourcePartition, sourceOffset, topicName, null, + null, null, null, "value" + ); + srcRecord.headers().addString("event-type", "order.created"); + srcRecord.headers().addString("event-id", "abc-123"); + srcRecord.headers().addLong("event-version", 42L); + + KafkaSourceRecord record = kafkaConnectSource.processSourceRecord(srcRecord); + + assertEquals("order.created", record.getProperties().get("event-type")); + assertEquals("abc-123", record.getProperties().get("event-id")); + assertEquals("42", record.getProperties().get("event-version")); + assertEquals(3, record.getProperties().size()); + } + @Test void testShortTopicNames() throws Exception { Map<String, Object> config = getConfig();
