Copilot commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3115597248


##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -98,19 +98,11 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
     // Thi is a workaround for https://github.com/apache/pulsar/issues/19922
     private boolean collapsePartitionedTopics = false;
 
-    private final Cache<String, String> sanitizedTopicCache =
-            CacheBuilder.newBuilder().maximumSize(1000)
-                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
-
-    // Can't really safely expire these entries.  If we do, we could end up 
with
-    // a sanitized topic name that used in e.g. resume() after a long pause 
but can't be
-    // // re-resolved into a form usable for Pulsar.
-    private final Cache<String, String> desanitizedTopicCache =
-            CacheBuilder.newBuilder().build();
-
     private int maxBatchBitsForOffset = 12;
     private boolean useIndexAsOffset = true;
 
+    TopicPartitionResolver topicPartitionResolver;

Review Comment:
   `topicPartitionResolver` is currently package-private and mutable, which 
makes it easy for other code in the package (or tests) to accidentally 
reassign/null it out. Consider making it `private final` (initialized in 
`open()`), and if tests need access to sanitization, expose a small 
`@VisibleForTesting` accessor/helper method instead of reaching into the field 
directly.
   ```suggestion
       private TopicPartitionResolver topicPartitionResolver;
   ```



##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -391,6 +388,94 @@ static class BatchMessageSequenceRef {
         int batchIdx;
     }
 
+    @Getter
+    @AllArgsConstructor
+    static class ResolvedTopicPartition {
+        private final String topic;
+        private final int partition;
+    }
+
+    static class TopicPartitionResolver {
+        private final String topicName;
+        private final boolean sanitizeTopicName;
+        private final boolean collapsePartitionedTopics;
+        private final Cache<String, String> sanitizedTopicCache =
+                CacheBuilder.newBuilder().maximumSize(1000)
+                        .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+        // Can't really safely expire these entries.  If we do, we could end 
up with
+        // a sanitized topic name that used in e.g. resume() after a long 
pause but can't be
+        // // re-resolved into a form usable for Pulsar.

Review Comment:
   Minor grammar/typo in this comment (missing “is”) and an extra `//` that 
looks accidental; cleaning it up will improve readability.
   ```suggestion
           // a sanitized topic name that is used in e.g. resume() after a long 
pause but can't be
           // re-resolved into a form usable for Pulsar.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to