This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new f0c0ea3  [fix][io] Fix acknowledgments not being sent when 
collapsePartitionedTopics=true in kafka-connect-adapter (#11)
f0c0ea3 is described below

commit f0c0ea3e7cbb29267732aa4a403d423d5f5403ab
Author: Malla Sandeep <[email protected]>
AuthorDate: Tue Apr 21 17:00:00 2026 +0530

    [fix][io] Fix acknowledgments not being sent when 
collapsePartitionedTopics=true in kafka-connect-adapter (#11)
    
    * added check for collapsePartitionedTopic for sending acknowledgments as 
well and added integration tests for the same
    
    * added topicPartitionResolver and refactored required methods into it.
    
    * refactored the caches into the nested class
    
    * removed unnecessary reference to KafkaConnectSink
    
    * increased visibility of TopicPartitionResolver to package private for 
test.
    
    * typo in comment
    
    * resolve nullPointerException issue with retry
    
    * minor typos and topicPartitionResolver access specifier to private
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 191 +++++++++++++--------
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  68 +++++++-
 2 files changed, 189 insertions(+), 70 deletions(-)

diff --git 
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 69f1959..14a3d67 100644
--- 
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -98,19 +98,11 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
     // Thi is a workaround for https://github.com/apache/pulsar/issues/19922
     private boolean collapsePartitionedTopics = false;
 
-    private final Cache<String, String> sanitizedTopicCache =
-            CacheBuilder.newBuilder().maximumSize(1000)
-                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
-
-    // Can't really safely expire these entries.  If we do, we could end up 
with
-    // a sanitized topic name that used in e.g. resume() after a long pause 
but can't be
-    // // re-resolved into a form usable for Pulsar.
-    private final Cache<String, String> desanitizedTopicCache =
-            CacheBuilder.newBuilder().build();
-
     private int maxBatchBitsForOffset = 12;
     private boolean useIndexAsOffset = true;
 
+    private TopicPartitionResolver topicPartitionResolver;
+
     @Override
     public void write(Record<GenericObject> sourceRecord) {
         if (log.isDebugEnabled()) {
@@ -198,19 +190,15 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
             x.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
kafkaSinkConfig.getOffsetStorageTopic());
         });
         task = (SinkTask) taskClass.getConstructor().newInstance();
+
+        topicPartitionResolver = new TopicPartitionResolver(
+                topicName,
+                sanitizeTopicName,
+                collapsePartitionedTopics);
+
         taskContext =
-                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, 
task::open, kafkaName -> {
-                    if (sanitizeTopicName) {
-                        String pulsarTopicName = 
desanitizedTopicCache.getIfPresent(kafkaName);
-                        if (log.isDebugEnabled()) {
-                            log.debug("desanitizedTopicCache got: kafkaName: 
{}, pulsarTopicName: {}",
-                                    kafkaName, pulsarTopicName);
-                        }
-                        return pulsarTopicName != null ? pulsarTopicName : 
kafkaName;
-                    } else {
-                        return kafkaName;
-                    }
-                });
+                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open,
+                        topicPartitionResolver::desanitizeTopicName);
         task.initialize(taskContext);
         task.start(configs.get(0));
 
@@ -264,6 +252,10 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
             ackUntil(lastNotFlushed, committedOffsets, Record::ack);
             log.info("Flush succeeded");
         } catch (Throwable t) {
+            if (committedOffsets == null) {
+                log.error("preCommit failed — retrying to preserve ordering", 
t);
+                return;
+            }
             log.error("error flushing pending records", t);
             ackUntil(lastNotFlushed, committedOffsets, Record::fail);
         } finally {
@@ -300,13 +292,13 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
             partitionOffset.put(tp.partition(), e.getValue().offset());
         }
 
+        int ackRequestedCount = 0;
         for (Record<GenericObject> r : pendingFlushQueue) {
-            final String topic = 
sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
-            final int partition = r.getPartitionIndex().orElse(0);
+            ResolvedTopicPartition resolved = 
topicPartitionResolver.resolve(r);
 
             Long lastCommittedOffset = null;
-            if (topicOffsets.containsKey(topic)) {
-                lastCommittedOffset = topicOffsets.get(topic).get(partition);
+            if (topicOffsets.containsKey(resolved.getTopic())) {
+                lastCommittedOffset = 
topicOffsets.get(resolved.getTopic()).get(resolved.getPartition());
             }
 
             if (lastCommittedOffset == null) {
@@ -326,15 +318,20 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
             }
 
             cb.accept(r);
+            ackRequestedCount++;
             pendingFlushQueue.remove(r);
             currentBatchSize.addAndGet(-1 * r.getMessage().get().size());
             if (r == lastNotFlushed) {
                 break;
             }
         }
+        if (log.isDebugEnabled()) {
+            log.debug("ackRequestedCount: {}, committedOffsets: {}", 
ackRequestedCount, committedOffsets);
+        }
     }
 
-    private long getMessageOffset(Record<GenericObject> sourceRecord) {
+    @VisibleForTesting
+    long getMessageOffset(Record<GenericObject> sourceRecord) {
 
         if (sourceRecord.getMessage().isPresent()) {
             // Use index added by 
org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
@@ -383,6 +380,11 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
                 .orElse(-1L);
     }
 
+    @VisibleForTesting
+    TopicPartitionResolver getTopicPartitionResolver() {
+        return topicPartitionResolver;
+    }
+
     @Getter
     @AllArgsConstructor
     static class BatchMessageSequenceRef {
@@ -391,6 +393,94 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
         int batchIdx;
     }
 
+    @Getter
+    @AllArgsConstructor
+    static class ResolvedTopicPartition {
+        private final String topic;
+        private final int partition;
+    }
+
+    static class TopicPartitionResolver {
+        private final String topicName;
+        private final boolean sanitizeTopicName;
+        private final boolean collapsePartitionedTopics;
+        private final Cache<String, String> sanitizedTopicCache =
+                CacheBuilder.newBuilder().maximumSize(1000)
+                        .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+        // Can't really safely expire these entries.  If we do, we could end 
up with
+        // a sanitized topic name that is used in e.g. resume() after a long 
pause but can't be
+        // re-resolved into a form usable for Pulsar.
+        private final Cache<String, String> desanitizedTopicCache =
+                CacheBuilder.newBuilder().build();
+
+        private TopicPartitionResolver(String topicName,
+                                       boolean sanitizeTopicName,
+                                       boolean collapsePartitionedTopics) {
+            this.topicName = topicName;
+            this.sanitizeTopicName = sanitizeTopicName;
+            this.collapsePartitionedTopics = collapsePartitionedTopics;
+        }
+
+        private ResolvedTopicPartition resolve(Record<GenericObject> 
sourceRecord) {
+            final int partition;
+            final String topic;
+
+            if (shouldCollapsePartitionedTopic(sourceRecord)) {
+                TopicName tn = 
TopicName.get(sourceRecord.getTopicName().get());
+                partition = tn.getPartitionIndex();
+                topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName());
+            } else {
+                partition = sourceRecord.getPartitionIndex().orElse(0);
+                topic = 
sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName));
+            }
+            return new ResolvedTopicPartition(topic, partition);
+        }
+
+        private String desanitizeTopicName(String kafkaName) {
+            if (sanitizeTopicName) {
+                String pulsarTopicName = 
desanitizedTopicCache.getIfPresent(kafkaName);
+                if (log.isDebugEnabled()) {
+                    log.debug("desanitizedTopicCache got: kafkaName: {}, 
pulsarTopicName: {}",
+                            kafkaName, pulsarTopicName);
+                }
+                return pulsarTopicName != null ? pulsarTopicName : kafkaName;
+            } else {
+                return kafkaName;
+            }
+        }
+
+        // Replace all non-letter, non-digit characters with underscore.
+        // Append underscore in front of name if it does not begin with 
alphabet or underscore.
+        String sanitizeNameIfNeeded(String name) {
+            if (!sanitizeTopicName) {
+                return name;
+            }
+
+            try {
+                return sanitizedTopicCache.get(name, () -> {
+                    String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", 
"_");
+                    if (sanitizedName.matches("^[^a-zA-Z_].*")) {
+                        sanitizedName = "_" + sanitizedName;
+                    }
+                    // do this once, sanitize() can be called on already 
sanitized name
+                    // so avoid replacing with (sanitizedName -> 
sanitizedName).
+                    desanitizedTopicCache.get(sanitizedName, () -> name);
+                    return sanitizedName;
+                });
+            } catch (ExecutionException e) {
+                log.error("Failed to get sanitized topic name for {}", name, 
e);
+                throw new IllegalStateException("Failed to get sanitized topic 
name for " + name, e);
+            }
+        }
+
+        private boolean shouldCollapsePartitionedTopic(Record<GenericObject> 
r) {
+            return collapsePartitionedTopics
+                    && r.getTopicName().isPresent()
+                    && TopicName.get(r.getTopicName().get()).isPartitioned();
+        }
+    }
+
     private static Method getMethodOfMessageId(MessageId messageId, String 
name) throws NoSuchMethodException {
         Class<?> clazz = messageId.getClass();
         NoSuchMethodException firstException = null;
@@ -437,19 +527,11 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
 
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
-        final int partition;
-        final String topic;
-
-        if (collapsePartitionedTopics
-                && sourceRecord.getTopicName().isPresent()
-                && 
TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
-            TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
-            partition = tn.getPartitionIndex();
-            topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), 
sanitizeTopicName);
-        } else {
-            partition = sourceRecord.getPartitionIndex().orElse(0);
-            topic = 
sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), 
sanitizeTopicName);
-        }
+
+        ResolvedTopicPartition resolved = 
topicPartitionResolver.resolve(sourceRecord);
+        final int partition = resolved.getPartition();
+        final String topic = resolved.getTopic();
+
         final Object key;
         final Object value;
         final Schema keySchema;
@@ -522,31 +604,6 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
 
     @VisibleForTesting
     protected long currentOffset(String topic, int partition) {
-        return taskContext.currentOffset(sanitizeNameIfNeeded(topic, 
sanitizeTopicName), partition);
+        return 
taskContext.currentOffset(topicPartitionResolver.sanitizeNameIfNeeded(topic), 
partition);
     }
-
-    // Replace all non-letter, non-digit characters with underscore.
-    // Append underscore in front of name if it does not begin with alphabet 
or underscore.
-    protected String sanitizeNameIfNeeded(String name, boolean sanitize) {
-        if (!sanitize) {
-            return name;
-        }
-
-        try {
-            return sanitizedTopicCache.get(name, () -> {
-                String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_");
-                if (sanitizedName.matches("^[^a-zA-Z_].*")) {
-                    sanitizedName = "_" + sanitizedName;
-                }
-                // do this once, sanitize() can be called on already sanitized 
name
-                // so avoid replacing with (sanitizedName -> sanitizedName).
-                desanitizedTopicCache.get(sanitizedName, () -> name);
-                return sanitizedName;
-            });
-        } catch (ExecutionException e) {
-            log.error("Failed to get sanitized topic name for {}", name, e);
-            throw new IllegalStateException("Failed to get sanitized topic 
name for " + name, e);
-        }
-    }
-
-}
+}
\ No newline at end of file
diff --git 
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 8bcad68..74af038 100644
--- 
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -344,7 +344,8 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
 
         assertEquals(status.get(), 1);
 
-        final TopicPartition tp = new 
TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
+        final TopicPartition tp = new 
TopicPartition(sink.getTopicPartitionResolver()
+                .sanitizeNameIfNeeded(pulsarTopicName), 0);
         assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
         assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 
FunctionCommon.getSequenceId(msgId));
 
@@ -1574,7 +1575,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         assertNull(ref);
 
         ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
-                        new TopicMessageIdImpl("topic-0", new 
MessageIdImpl(ledgerId, entryId, 0))
+                new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId, 
entryId, 0))
         );
         assertNull(ref);
 
@@ -1648,7 +1649,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));
 
         KafkaConnectSink sink = new KafkaConnectSink();
-            sink.open(props, context);
+        sink.open(props, context);
 
         AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> 
pulsarAvroSchema =
                 
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
@@ -1682,6 +1683,67 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         sink.close();
     }
 
+    @Test
+    public void testAckUntilWithCollapsePartitionedTopics() throws Exception {
+        testAckUntil(true,
+                "persistent://a/b/fake-topic-partition-0",
+                "persistent://a/b/fake-topic",
+                0);
+    }
+
+    @Test
+    public void testAckUntilWithoutCollapsePartitionedTopics() throws 
Exception {
+        // Note: Without collapsePartitionedTopics expectedPartition in the 
committedOffsets will always be 0
+        testAckUntil(false,
+                "persistent://a/b/fake-topic-partition-1",
+                "persistent://a/b/fake-topic-partition-1",
+                0);
+    }
+
+    private void testAckUntil(boolean collapseEnabled,
+                              String pulsarTopic,
+                              String expectedKafkaTopic,
+                              int expectedPartition) throws Exception {
+        // Setup sink with given collapseEnabled value
+        props.put("kafkaConnectorSinkClass", 
SchemaedFileStreamSinkConnector.class.getCanonicalName());
+        props.put("collapsePartitionedTopics", 
Boolean.toString(collapseEnabled));
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        // Create pulsar record with given pulsarTopic and expectedPartition
+        Message msg = mock(MessageImpl.class);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1, 
expectedPartition));
+        when(msg.getValue()).thenReturn(null);
+
+        AtomicInteger ackCount = new AtomicInteger(0);
+
+        Record<GenericObject> record = PulsarRecord.<GenericObject>builder()
+                .topicName(pulsarTopic)
+                .message(msg)
+                .ackFunction(ackCount::incrementAndGet)
+                .failFunction(() -> {})
+                .build();
+
+        // Add the pulsar record to pendingFlushQueue
+        sink.pendingFlushQueue.add(record);
+
+        // Build committedOffsets with the given expectedKafkaTopic and 
expectedPartition
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new 
HashMap<>();
+        committedOffsets.put(
+                new TopicPartition(expectedKafkaTopic, expectedPartition),
+                new OffsetAndMetadata(sink.getMessageOffset(record))
+        );
+
+        // Trigger ackUntil manually
+        sink.ackUntil(record, committedOffsets, Record::ack);
+
+        // Assert that the ackFunction runnable of the record is called and 
pendingFlushQueue is empty
+        Assert.assertEquals(ackCount.get(), 1);
+        Assert.assertTrue(sink.pendingFlushQueue.isEmpty());
+
+        sink.close();
+    }
+
     @SneakyThrows
     private java.util.Date getDateFromString(String dateInString) {
         SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy 
hh:mm:ss");

Reply via email to