sandeep-mst commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3105380496


##########
kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java:
##########
@@ -1682,6 +1682,67 @@ private void testCollapsePartitionedTopic(boolean 
isEnabled,
         sink.close();
     }
 
+    @Test
+    public void testAckUntilWithCollapsePartitionedTopics() throws Exception {
+        testAckUntil(true,
+                "persistent://a/b/fake-topic-partition-0",
+                "persistent://a/b/fake-topic",
+                0);
+    }
+
+    @Test
+    public void testAckUntilWithoutCollapsePartitionedTopics() throws 
Exception {
+        // Note: Without collapsePartitionedTopics expectedPartition in the 
committedOffsets will always be 0
+        testAckUntil(false,
+                "persistent://a/b/fake-topic-partition-1",
+                "persistent://a/b/fake-topic-partition-1",
+                0);
+    }
+
+    private void testAckUntil(boolean collapseEnabled,
+                              String pulsarTopic,
+                              String expectedKafkaTopic,
+                              int expectedPartition) throws Exception {
+        // Setup sink with given collapseEnabled value
+        props.put("kafkaConnectorSinkClass", 
SchemaedFileStreamSinkConnector.class.getCanonicalName());
+        props.put("collapsePartitionedTopics", 
Boolean.toString(collapseEnabled));
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        // Create pulsar record with given pulsarTopic and expectedPartition
+        Message msg = mock(MessageImpl.class);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1, 
expectedPartition));
+        when(msg.getValue()).thenReturn(null);
+
+        AtomicInteger ackCount = new AtomicInteger(0);
+
+        Record<GenericObject> record = PulsarRecord.<GenericObject>builder()
+                .topicName(pulsarTopic)
+                .message(msg)
+                .ackFunction(ackCount::incrementAndGet)
+                .failFunction(() -> {})
+                .build();
+
+        // Add the pulsar record to pendingFlushQueue
+        sink.pendingFlushQueue.add(record);
+
+        // Build committedOffsets with the given expectedKafkaTopic and 
expectedPartition
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new 
HashMap<>();
+        committedOffsets.put(
+                new TopicPartition(expectedKafkaTopic, expectedPartition),
+                new OffsetAndMetadata(sink.getMessageOffset(record))
+        );
+
+        // Trigger actUntil manually

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to