Copilot commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3115597248
##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -98,19 +98,11 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
private boolean collapsePartitionedTopics = false;
- private final Cache<String, String> sanitizedTopicCache =
- CacheBuilder.newBuilder().maximumSize(1000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build();
-
- // Can't really safely expire these entries. If we do, we could end up
with
- // a sanitized topic name that used in e.g. resume() after a long pause
but can't be
- // // re-resolved into a form usable for Pulsar.
- private final Cache<String, String> desanitizedTopicCache =
- CacheBuilder.newBuilder().build();
-
private int maxBatchBitsForOffset = 12;
private boolean useIndexAsOffset = true;
+ TopicPartitionResolver topicPartitionResolver;
Review Comment:
`topicPartitionResolver` is currently package-private and mutable, which
makes it easy for other code in the package (or tests) to accidentally
reassign/null it out. Consider making it `private final` (initialized in
`open()`), and if tests need access to sanitization, expose a small
`@VisibleForTesting` accessor/helper method instead of reaching into the field
directly.
```suggestion
private TopicPartitionResolver topicPartitionResolver;
```
##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -391,6 +388,94 @@ static class BatchMessageSequenceRef {
int batchIdx;
}
+ @Getter
+ @AllArgsConstructor
+ static class ResolvedTopicPartition {
+ private final String topic;
+ private final int partition;
+ }
+
+ static class TopicPartitionResolver {
+ private final String topicName;
+ private final boolean sanitizeTopicName;
+ private final boolean collapsePartitionedTopics;
+ private final Cache<String, String> sanitizedTopicCache =
+ CacheBuilder.newBuilder().maximumSize(1000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+ // Can't really safely expire these entries. If we do, we could end
up with
+ // a sanitized topic name that used in e.g. resume() after a long
pause but can't be
+ // // re-resolved into a form usable for Pulsar.
Review Comment:
Minor grammar/typo in this comment (missing “is”) and an extra `//` that
looks accidental; cleaning it up will improve readability.
```suggestion
// a sanitized topic name that is used in e.g. resume() after a long
pause but can't be
// re-resolved into a form usable for Pulsar.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]