sandeep-mst commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3105380496
##########
kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java:
##########
@@ -1682,6 +1682,67 @@ private void testCollapsePartitionedTopic(boolean
isEnabled,
sink.close();
}
+ @Test
+ public void testAckUntilWithCollapsePartitionedTopics() throws Exception {
+ testAckUntil(true,
+ "persistent://a/b/fake-topic-partition-0",
+ "persistent://a/b/fake-topic",
+ 0);
+ }
+
+ @Test
+ public void testAckUntilWithoutCollapsePartitionedTopics() throws
Exception {
+ // Note: Without collapsePartitionedTopics expectedPartition in the
committedOffsets will always be 0
+ testAckUntil(false,
+ "persistent://a/b/fake-topic-partition-1",
+ "persistent://a/b/fake-topic-partition-1",
+ 0);
+ }
+
+ private void testAckUntil(boolean collapseEnabled,
+ String pulsarTopic,
+ String expectedKafkaTopic,
+ int expectedPartition) throws Exception {
+ // Setup sink with given collapseEnabled value
+ props.put("kafkaConnectorSinkClass",
SchemaedFileStreamSinkConnector.class.getCanonicalName());
+ props.put("collapsePartitionedTopics",
Boolean.toString(collapseEnabled));
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ // Create pulsar record with given pulsarTopic and expectedPartition
+ Message msg = mock(MessageImpl.class);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1,
expectedPartition));
+ when(msg.getValue()).thenReturn(null);
+
+ AtomicInteger ackCount = new AtomicInteger(0);
+
+ Record<GenericObject> record = PulsarRecord.<GenericObject>builder()
+ .topicName(pulsarTopic)
+ .message(msg)
+ .ackFunction(ackCount::incrementAndGet)
+ .failFunction(() -> {})
+ .build();
+
+ // Add the pulsar record to pendingFlushQueue
+ sink.pendingFlushQueue.add(record);
+
+ // Build committedOffsets with the given expectedKafkaTopic and
expectedPartition
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new
HashMap<>();
+ committedOffsets.put(
+ new TopicPartition(expectedKafkaTopic, expectedPartition),
+ new OffsetAndMetadata(sink.getMessageOffset(record))
+ );
+
+ // Trigger actUntil manually
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]