sandeep-mst commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3115641044


##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -98,19 +98,11 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
     // Thi is a workaround for https://github.com/apache/pulsar/issues/19922
     private boolean collapsePartitionedTopics = false;
 
-    private final Cache<String, String> sanitizedTopicCache =
-            CacheBuilder.newBuilder().maximumSize(1000)
-                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
-
-    // Can't really safely expire these entries.  If we do, we could end up 
with
-    // a sanitized topic name that used in e.g. resume() after a long pause 
but can't be
-    // // re-resolved into a form usable for Pulsar.
-    private final Cache<String, String> desanitizedTopicCache =
-            CacheBuilder.newBuilder().build();
-
     private int maxBatchBitsForOffset = 12;
     private boolean useIndexAsOffset = true;
 
+    TopicPartitionResolver topicPartitionResolver;

Review Comment:
   done



##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -391,6 +388,94 @@ static class BatchMessageSequenceRef {
         int batchIdx;
     }
 
+    @Getter
+    @AllArgsConstructor
+    static class ResolvedTopicPartition {
+        private final String topic;
+        private final int partition;
+    }
+
+    static class TopicPartitionResolver {
+        private final String topicName;
+        private final boolean sanitizeTopicName;
+        private final boolean collapsePartitionedTopics;
+        private final Cache<String, String> sanitizedTopicCache =
+                CacheBuilder.newBuilder().maximumSize(1000)
+                        .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+        // Can't really safely expire these entries.  If we do, we could end 
up with
+        // a sanitized topic name that used in e.g. resume() after a long 
pause but can't be
+        // // re-resolved into a form usable for Pulsar.

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to