lhotari commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3030485081
##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -300,9 +300,18 @@ protected void ackUntil(Record<GenericObject>
lastNotFlushed,
partitionOffset.put(tp.partition(), e.getValue().offset());
}
+ int ackRequestedCount = 0;
for (Record<GenericObject> r : pendingFlushQueue) {
- final String topic =
sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
- final int partition = r.getPartitionIndex().orElse(0);
+ final String topic;
+ final int partition;
+ if (shouldCollapsePartitionedTopic(r)) {
+ TopicName tn = TopicName.get(r.getTopicName().get());
+ partition = tn.getPartitionIndex();
+ topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(),
sanitizeTopicName);
+ } else {
+ partition = r.getPartitionIndex().orElse(0);
+ topic =
sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
+ }
Review Comment:
since this logic is duplicated in toSourceRecord method, it could be useful
to create a private internal class to handle the logic and also move the
shouldCollapsePartitionedTopic and other directly related methods there unless
there's a need to use them in the parent class. The topic and partition could
be retrieved after initialization. The logic could be executed in the
constructor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]