lhotari commented on code in PR #25922:
URL: https://github.com/apache/pulsar/pull/25922#discussion_r3350029370
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1198,6 +1199,18 @@ public void openCursorComplete(ManagedCursor cursor,
Object ctx) {
} else {
// if subscription exists, check if it's a non-durable
subscription
if (subscription.getCursor() != null &&
!subscription.getCursor().isDurable()) {
Review Comment:
just wondering whether the check for `!subscription.getCursor().isDurable()`
could be performed before calling `asyncOpenCursor`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1198,6 +1199,18 @@ public void openCursorComplete(ManagedCursor cursor,
Object ctx) {
} else {
// if subscription exists, check if it's a non-durable
subscription
if (subscription.getCursor() != null &&
!subscription.getCursor().isDurable()) {
+ ledger.asyncDeleteCursor(encodedSubscriptionName, new
DeleteCursorCallback() {
+ @Override
+ public void deleteCursorComplete(Object ctx) {
+ }
+
+ @Override
+ public void
deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn().attr("subscription",
subscriptionName)
+ .exceptionMessage(exception)
+ .log("Failed to delete cursor for
conflicts");
+ }
+ }, null);
Review Comment:
Just wondering if could result in the non-durable cursor getting removed.
https://github.com/apache/pulsar/blob/dd9462646ace229237d40e1d6036b9aa3c917599/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1086-L1096
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1184,7 +1184,8 @@ private CompletableFuture<Subscription>
getDurableSubscription(String subscripti
}
Map<String, Long> properties =
PersistentSubscription.getBaseCursorProperties(replicated);
- ledger.asyncOpenCursor(Codec.encode(subscriptionName),
initialPosition, properties, subscriptionProperties,
+ String encodedSubscriptionName = Codec.encode(subscriptionName);
+ ledger.asyncOpenCursor(encodedSubscriptionName, initialPosition,
properties, subscriptionProperties,
Review Comment:
The default implementation seems to return the non-durable cursor and not
create a new one:
https://github.com/apache/pulsar/blob/dd9462646ace229237d40e1d6036b9aa3c917599/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1031-L1036
However, it's possible that there's a race where this would break.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
Review Comment:
The cursor returned here could be non-durable, at least in the default
ManagedLedgerImpl. Checking `cursor.isDurable()` could be useful too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]