Denovo1998 commented on code in PR #25443:
URL: https://github.com/apache/pulsar/pull/25443#discussion_r3131042853


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -550,31 +550,39 @@ private CompletableFuture<Set<String>> 
getReplicationClusters() {
     }
 
     protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
-        getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-            if (metadata != null && metadata.partitions > 0) {
-                validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+        
pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed
 -> {

Review Comment:
   The original implementation had a unified exceptionally(...) at the 
outermost layer to ensure that AsyncResponse would definitely be resumed; now 
that the outermost layer has been replaced with 
isCurrentClusterAllowed(...).thenAccept(...), no error handling has been 
attached outside anymore. If an exception is thrown during the process of 
reading the namespace/topic policy, this suspended request will be hung 
indefinitely until the timeout. Here, the unified exceptionally/whenComplete at 
the outermost layer should be retained.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -550,31 +550,39 @@ private CompletableFuture<Set<String>> 
getReplicationClusters() {
     }
 
     protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
-        getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-            if (metadata != null && metadata.partitions > 0) {
-                validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+        
pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed
 -> {
+            if (!allowed) {
+                resumeAsyncResponseExceptionally(asyncResponse,

Review Comment:
   Here resumeAsyncResponseExceptionally(...) does not stop the subsequent 
chain, so even if allowed=false, the subsequent 
getPartitionedTopicMetadataAsync(...)->tryCreatePartitionsAsync(...) will still 
continue to execute. The client may receive a failure, but the partition will 
still be recreated.
   
   Change to a single thenCompose chain, and directly return a failed future 
when not allowed?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java:
##########
@@ -149,6 +149,13 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws 
Exception {
         });
         waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
 
+        try {
+            admin2.topics().createMissedPartitions(topicName);

Review Comment:
   This test currently only asserts that the admin call throws an exception, 
but does not assert that the partition still does not exist after the failure. 
Since "request rejected" and "subsequent continue tryCreatePartitionsAsync" are 
two separate paths in the implementation, relying solely on the exception 
message cannot prevent false positives where "the response fails but the side 
effect has already occurred". It is recommended to add another assertion to 
confirm that topicName-partition-0 has not been recreated, or that the 
corresponding replicator remains stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to