Copilot commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3030499510
##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -300,9 +300,18 @@ protected void ackUntil(Record<GenericObject>
lastNotFlushed,
partitionOffset.put(tp.partition(), e.getValue().offset());
}
+ int ackRequestedCount = 0;
for (Record<GenericObject> r : pendingFlushQueue) {
- final String topic =
sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
- final int partition = r.getPartitionIndex().orElse(0);
+ final String topic;
+ final int partition;
+ if (shouldCollapsePartitionedTopic(r)) {
Review Comment:
`ackUntil()` assumes `committedOffsets` is non-null, but `flush()` can call
`ackUntil(lastNotFlushed, committedOffsets, Record::fail)` with
`committedOffsets` still null if an exception occurs before `task.preCommit()`
assigns it. This can cause a NullPointerException and leave records stuck in
`pendingFlushQueue`. Add a null/empty guard (and decide how to handle fail/ack
in that case) before iterating committedOffsets.
##########
kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java:
##########
@@ -1682,6 +1682,67 @@ private void testCollapsePartitionedTopic(boolean
isEnabled,
sink.close();
}
+ @Test
+ public void testAckUntilWithCollapsePartitionedTopics() throws Exception {
+ testAckUntil(true,
+ "persistent://a/b/fake-topic-partition-0",
+ "persistent://a/b/fake-topic",
+ 0);
+ }
+
+ @Test
+ public void testAckUntilWithoutCollapsePartitionedTopics() throws
Exception {
+ // Note: Without collapsePartitionedTopics expectedPartition in the
committedOffsets will always be 0
+ testAckUntil(false,
+ "persistent://a/b/fake-topic-partition-1",
+ "persistent://a/b/fake-topic-partition-1",
+ 0);
+ }
+
+ private void testAckUntil(boolean collapseEnabled,
+ String pulsarTopic,
+ String expectedKafkaTopic,
+ int expectedPartition) throws Exception {
+ // Setup sink with given collapseEnabled value
+ props.put("kafkaConnectorSinkClass",
SchemaedFileStreamSinkConnector.class.getCanonicalName());
+ props.put("collapsePartitionedTopics",
Boolean.toString(collapseEnabled));
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ // Create pulsar record with given pulsarTopic and expectedPartition
+ Message msg = mock(MessageImpl.class);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1,
expectedPartition));
+ when(msg.getValue()).thenReturn(null);
+
+ AtomicInteger ackCount = new AtomicInteger(0);
+
+ Record<GenericObject> record = PulsarRecord.<GenericObject>builder()
+ .topicName(pulsarTopic)
+ .message(msg)
+ .ackFunction(ackCount::incrementAndGet)
+ .failFunction(() -> {})
+ .build();
+
+ // Add the pulsar record to pendingFlushQueue
+ sink.pendingFlushQueue.add(record);
+
+ // Build committedOffsets with the given expectedKafkaTopic and
expectedPartition
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new
HashMap<>();
+ committedOffsets.put(
+ new TopicPartition(expectedKafkaTopic, expectedPartition),
+ new OffsetAndMetadata(sink.getMessageOffset(record))
+ );
+
+ // Trigger actUntil manually
Review Comment:
Typo in comment: “Trigger actUntil manually” should be “Trigger ackUntil
manually”.
```suggestion
// Trigger ackUntil manually
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]