This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new f0c0ea3 [fix][io] Fix acknowledgments not being sent when
collapsePartitionedTopics=true in kafka-connect-adapter (#11)
f0c0ea3 is described below
commit f0c0ea3e7cbb29267732aa4a403d423d5f5403ab
Author: Malla Sandeep <[email protected]>
AuthorDate: Tue Apr 21 17:00:00 2026 +0530
[fix][io] Fix acknowledgments not being sent when
collapsePartitionedTopics=true in kafka-connect-adapter (#11)
* added check for collapsePartitionedTopic for sending acknowledgments as
well and added integration tests for the same
* added topicPartitionResolver and refactored required methods into it.
* refactored the caches into the nested class
* removed unnecessary reference to KafkaConnectSink
* increased visibility of TopicPartitionResolver to package private for
test.
* typo in comment
* resolve nullPointerException issue with retry
* minor typos and topicPartitionResolver access specifier to private
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 191 +++++++++++++--------
.../io/kafka/connect/KafkaConnectSinkTest.java | 68 +++++++-
2 files changed, 189 insertions(+), 70 deletions(-)
diff --git
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 69f1959..14a3d67 100644
---
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -98,19 +98,11 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
private boolean collapsePartitionedTopics = false;
- private final Cache<String, String> sanitizedTopicCache =
- CacheBuilder.newBuilder().maximumSize(1000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build();
-
- // Can't really safely expire these entries. If we do, we could end up
with
- // a sanitized topic name that used in e.g. resume() after a long pause
but can't be
- // // re-resolved into a form usable for Pulsar.
- private final Cache<String, String> desanitizedTopicCache =
- CacheBuilder.newBuilder().build();
-
private int maxBatchBitsForOffset = 12;
private boolean useIndexAsOffset = true;
+ private TopicPartitionResolver topicPartitionResolver;
+
@Override
public void write(Record<GenericObject> sourceRecord) {
if (log.isDebugEnabled()) {
@@ -198,19 +190,15 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
x.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
kafkaSinkConfig.getOffsetStorageTopic());
});
task = (SinkTask) taskClass.getConstructor().newInstance();
+
+ topicPartitionResolver = new TopicPartitionResolver(
+ topicName,
+ sanitizeTopicName,
+ collapsePartitionedTopics);
+
taskContext =
- new PulsarKafkaSinkTaskContext(configs.get(0), ctx,
task::open, kafkaName -> {
- if (sanitizeTopicName) {
- String pulsarTopicName =
desanitizedTopicCache.getIfPresent(kafkaName);
- if (log.isDebugEnabled()) {
- log.debug("desanitizedTopicCache got: kafkaName:
{}, pulsarTopicName: {}",
- kafkaName, pulsarTopicName);
- }
- return pulsarTopicName != null ? pulsarTopicName :
kafkaName;
- } else {
- return kafkaName;
- }
- });
+ new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open,
+ topicPartitionResolver::desanitizeTopicName);
task.initialize(taskContext);
task.start(configs.get(0));
@@ -264,6 +252,10 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
ackUntil(lastNotFlushed, committedOffsets, Record::ack);
log.info("Flush succeeded");
} catch (Throwable t) {
+ if (committedOffsets == null) {
+ log.error("preCommit failed — retrying to preserve ordering",
t);
+ return;
+ }
log.error("error flushing pending records", t);
ackUntil(lastNotFlushed, committedOffsets, Record::fail);
} finally {
@@ -300,13 +292,13 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
partitionOffset.put(tp.partition(), e.getValue().offset());
}
+ int ackRequestedCount = 0;
for (Record<GenericObject> r : pendingFlushQueue) {
- final String topic =
sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
- final int partition = r.getPartitionIndex().orElse(0);
+ ResolvedTopicPartition resolved =
topicPartitionResolver.resolve(r);
Long lastCommittedOffset = null;
- if (topicOffsets.containsKey(topic)) {
- lastCommittedOffset = topicOffsets.get(topic).get(partition);
+ if (topicOffsets.containsKey(resolved.getTopic())) {
+ lastCommittedOffset =
topicOffsets.get(resolved.getTopic()).get(resolved.getPartition());
}
if (lastCommittedOffset == null) {
@@ -326,15 +318,20 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
}
cb.accept(r);
+ ackRequestedCount++;
pendingFlushQueue.remove(r);
currentBatchSize.addAndGet(-1 * r.getMessage().get().size());
if (r == lastNotFlushed) {
break;
}
}
+ if (log.isDebugEnabled()) {
+ log.debug("ackRequestedCount: {}, committedOffsets: {}",
ackRequestedCount, committedOffsets);
+ }
}
- private long getMessageOffset(Record<GenericObject> sourceRecord) {
+ @VisibleForTesting
+ long getMessageOffset(Record<GenericObject> sourceRecord) {
if (sourceRecord.getMessage().isPresent()) {
// Use index added by
org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
@@ -383,6 +380,11 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
.orElse(-1L);
}
+ @VisibleForTesting
+ TopicPartitionResolver getTopicPartitionResolver() {
+ return topicPartitionResolver;
+ }
+
@Getter
@AllArgsConstructor
static class BatchMessageSequenceRef {
@@ -391,6 +393,94 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
int batchIdx;
}
+ @Getter
+ @AllArgsConstructor
+ static class ResolvedTopicPartition {
+ private final String topic;
+ private final int partition;
+ }
+
+ static class TopicPartitionResolver {
+ private final String topicName;
+ private final boolean sanitizeTopicName;
+ private final boolean collapsePartitionedTopics;
+ private final Cache<String, String> sanitizedTopicCache =
+ CacheBuilder.newBuilder().maximumSize(1000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+ // Can't really safely expire these entries. If we do, we could end
up with
+ // a sanitized topic name that is used in e.g. resume() after a long
pause but can't be
+ // re-resolved into a form usable for Pulsar.
+ private final Cache<String, String> desanitizedTopicCache =
+ CacheBuilder.newBuilder().build();
+
+ private TopicPartitionResolver(String topicName,
+ boolean sanitizeTopicName,
+ boolean collapsePartitionedTopics) {
+ this.topicName = topicName;
+ this.sanitizeTopicName = sanitizeTopicName;
+ this.collapsePartitionedTopics = collapsePartitionedTopics;
+ }
+
+ private ResolvedTopicPartition resolve(Record<GenericObject>
sourceRecord) {
+ final int partition;
+ final String topic;
+
+ if (shouldCollapsePartitionedTopic(sourceRecord)) {
+ TopicName tn =
TopicName.get(sourceRecord.getTopicName().get());
+ partition = tn.getPartitionIndex();
+ topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName());
+ } else {
+ partition = sourceRecord.getPartitionIndex().orElse(0);
+ topic =
sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName));
+ }
+ return new ResolvedTopicPartition(topic, partition);
+ }
+
+ private String desanitizeTopicName(String kafkaName) {
+ if (sanitizeTopicName) {
+ String pulsarTopicName =
desanitizedTopicCache.getIfPresent(kafkaName);
+ if (log.isDebugEnabled()) {
+ log.debug("desanitizedTopicCache got: kafkaName: {},
pulsarTopicName: {}",
+ kafkaName, pulsarTopicName);
+ }
+ return pulsarTopicName != null ? pulsarTopicName : kafkaName;
+ } else {
+ return kafkaName;
+ }
+ }
+
+ // Replace all non-letter, non-digit characters with underscore.
+ // Append underscore in front of name if it does not begin with
alphabet or underscore.
+ String sanitizeNameIfNeeded(String name) {
+ if (!sanitizeTopicName) {
+ return name;
+ }
+
+ try {
+ return sanitizedTopicCache.get(name, () -> {
+ String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]",
"_");
+ if (sanitizedName.matches("^[^a-zA-Z_].*")) {
+ sanitizedName = "_" + sanitizedName;
+ }
+ // do this once, sanitize() can be called on already
sanitized name
+ // so avoid replacing with (sanitizedName ->
sanitizedName).
+ desanitizedTopicCache.get(sanitizedName, () -> name);
+ return sanitizedName;
+ });
+ } catch (ExecutionException e) {
+ log.error("Failed to get sanitized topic name for {}", name,
e);
+ throw new IllegalStateException("Failed to get sanitized topic
name for " + name, e);
+ }
+ }
+
+ private boolean shouldCollapsePartitionedTopic(Record<GenericObject>
r) {
+ return collapsePartitionedTopics
+ && r.getTopicName().isPresent()
+ && TopicName.get(r.getTopicName().get()).isPartitioned();
+ }
+ }
+
private static Method getMethodOfMessageId(MessageId messageId, String
name) throws NoSuchMethodException {
Class<?> clazz = messageId.getClass();
NoSuchMethodException firstException = null;
@@ -437,19 +527,11 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
- final int partition;
- final String topic;
-
- if (collapsePartitionedTopics
- && sourceRecord.getTopicName().isPresent()
- &&
TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
- TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
- partition = tn.getPartitionIndex();
- topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(),
sanitizeTopicName);
- } else {
- partition = sourceRecord.getPartitionIndex().orElse(0);
- topic =
sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName),
sanitizeTopicName);
- }
+
+ ResolvedTopicPartition resolved =
topicPartitionResolver.resolve(sourceRecord);
+ final int partition = resolved.getPartition();
+ final String topic = resolved.getTopic();
+
final Object key;
final Object value;
final Schema keySchema;
@@ -522,31 +604,6 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
@VisibleForTesting
protected long currentOffset(String topic, int partition) {
- return taskContext.currentOffset(sanitizeNameIfNeeded(topic,
sanitizeTopicName), partition);
+ return
taskContext.currentOffset(topicPartitionResolver.sanitizeNameIfNeeded(topic),
partition);
}
-
- // Replace all non-letter, non-digit characters with underscore.
- // Append underscore in front of name if it does not begin with alphabet
or underscore.
- protected String sanitizeNameIfNeeded(String name, boolean sanitize) {
- if (!sanitize) {
- return name;
- }
-
- try {
- return sanitizedTopicCache.get(name, () -> {
- String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_");
- if (sanitizedName.matches("^[^a-zA-Z_].*")) {
- sanitizedName = "_" + sanitizedName;
- }
- // do this once, sanitize() can be called on already sanitized
name
- // so avoid replacing with (sanitizedName -> sanitizedName).
- desanitizedTopicCache.get(sanitizedName, () -> name);
- return sanitizedName;
- });
- } catch (ExecutionException e) {
- log.error("Failed to get sanitized topic name for {}", name, e);
- throw new IllegalStateException("Failed to get sanitized topic
name for " + name, e);
- }
- }
-
-}
+}
\ No newline at end of file
diff --git
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 8bcad68..74af038 100644
---
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -344,7 +344,8 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
assertEquals(status.get(), 1);
- final TopicPartition tp = new
TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
+ final TopicPartition tp = new
TopicPartition(sink.getTopicPartitionResolver()
+ .sanitizeNameIfNeeded(pulsarTopicName), 0);
assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
assertEquals(sink.currentOffset(tp.topic(), tp.partition()),
FunctionCommon.getSequenceId(msgId));
@@ -1574,7 +1575,7 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
assertNull(ref);
ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
- new TopicMessageIdImpl("topic-0", new
MessageIdImpl(ledgerId, entryId, 0))
+ new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId,
entryId, 0))
);
assertNull(ref);
@@ -1648,7 +1649,7 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));
KafkaConnectSink sink = new KafkaConnectSink();
- sink.open(props, context);
+ sink.open(props, context);
AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>
pulsarAvroSchema =
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
@@ -1682,6 +1683,67 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
sink.close();
}
+ @Test
+ public void testAckUntilWithCollapsePartitionedTopics() throws Exception {
+ testAckUntil(true,
+ "persistent://a/b/fake-topic-partition-0",
+ "persistent://a/b/fake-topic",
+ 0);
+ }
+
+ @Test
+ public void testAckUntilWithoutCollapsePartitionedTopics() throws
Exception {
+ // Note: Without collapsePartitionedTopics expectedPartition in the
committedOffsets will always be 0
+ testAckUntil(false,
+ "persistent://a/b/fake-topic-partition-1",
+ "persistent://a/b/fake-topic-partition-1",
+ 0);
+ }
+
+ private void testAckUntil(boolean collapseEnabled,
+ String pulsarTopic,
+ String expectedKafkaTopic,
+ int expectedPartition) throws Exception {
+ // Setup sink with given collapseEnabled value
+ props.put("kafkaConnectorSinkClass",
SchemaedFileStreamSinkConnector.class.getCanonicalName());
+ props.put("collapsePartitionedTopics",
Boolean.toString(collapseEnabled));
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ // Create pulsar record with given pulsarTopic and expectedPartition
+ Message msg = mock(MessageImpl.class);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1,
expectedPartition));
+ when(msg.getValue()).thenReturn(null);
+
+ AtomicInteger ackCount = new AtomicInteger(0);
+
+ Record<GenericObject> record = PulsarRecord.<GenericObject>builder()
+ .topicName(pulsarTopic)
+ .message(msg)
+ .ackFunction(ackCount::incrementAndGet)
+ .failFunction(() -> {})
+ .build();
+
+ // Add the pulsar record to pendingFlushQueue
+ sink.pendingFlushQueue.add(record);
+
+ // Build committedOffsets with the given expectedKafkaTopic and
expectedPartition
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new
HashMap<>();
+ committedOffsets.put(
+ new TopicPartition(expectedKafkaTopic, expectedPartition),
+ new OffsetAndMetadata(sink.getMessageOffset(record))
+ );
+
+ // Trigger ackUntil manually
+ sink.ackUntil(record, committedOffsets, Record::ack);
+
+ // Assert that the ackFunction runnable of the record is called and
pendingFlushQueue is empty
+ Assert.assertEquals(ackCount.get(), 1);
+ Assert.assertTrue(sink.pendingFlushQueue.isEmpty());
+
+ sink.close();
+ }
+
@SneakyThrows
private java.util.Date getDateFromString(String dateInString) {
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy
hh:mm:ss");