sandeep-mst commented on code in PR #11:
URL: https://github.com/apache/pulsar-connectors/pull/11#discussion_r3115641044
##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -98,19 +98,11 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
private boolean collapsePartitionedTopics = false;
- private final Cache<String, String> sanitizedTopicCache =
- CacheBuilder.newBuilder().maximumSize(1000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build();
-
- // Can't really safely expire these entries. If we do, we could end up
with
- // a sanitized topic name that used in e.g. resume() after a long pause
but can't be
- // // re-resolved into a form usable for Pulsar.
- private final Cache<String, String> desanitizedTopicCache =
- CacheBuilder.newBuilder().build();
-
private int maxBatchBitsForOffset = 12;
private boolean useIndexAsOffset = true;
+ TopicPartitionResolver topicPartitionResolver;
Review Comment:
done
##########
kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -391,6 +388,94 @@ static class BatchMessageSequenceRef {
int batchIdx;
}
+ @Getter
+ @AllArgsConstructor
+ static class ResolvedTopicPartition {
+ private final String topic;
+ private final int partition;
+ }
+
+ static class TopicPartitionResolver {
+ private final String topicName;
+ private final boolean sanitizeTopicName;
+ private final boolean collapsePartitionedTopics;
+ private final Cache<String, String> sanitizedTopicCache =
+ CacheBuilder.newBuilder().maximumSize(1000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+ // Can't really safely expire these entries. If we do, we could end
up with
+ // a sanitized topic name that used in e.g. resume() after a long
pause but can't be
+ // // re-resolved into a form usable for Pulsar.
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]