Denovo1998 commented on code in PR #25443:
URL: https://github.com/apache/pulsar/pull/25443#discussion_r3131042853
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -550,31 +550,39 @@ private CompletableFuture<Set<String>>
getReplicationClusters() {
}
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse)
{
- getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
- if (metadata != null && metadata.partitions > 0) {
- validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+
pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed
-> {
Review Comment:
The original implementation had a unified exceptionally(...) at the
outermost layer to ensure that AsyncResponse would definitely be resumed; now
that the outermost layer has been replaced with
isCurrentClusterAllowed(...).thenAccept(...), no error handling has been
attached outside anymore. If an exception is thrown during the process of
reading the namespace/topic policy, this suspended request will be hung
indefinitely until the timeout. Here, the unified exceptionally/whenComplete at
the outermost layer should be retained.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -550,31 +550,39 @@ private CompletableFuture<Set<String>>
getReplicationClusters() {
}
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse)
{
- getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
- if (metadata != null && metadata.partitions > 0) {
- validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+
pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed
-> {
+ if (!allowed) {
+ resumeAsyncResponseExceptionally(asyncResponse,
Review Comment:
Here resumeAsyncResponseExceptionally(...) does not stop the subsequent
chain, so even if allowed=false, the subsequent
getPartitionedTopicMetadataAsync(...)->tryCreatePartitionsAsync(...) will still
continue to execute. The client may receive a failure, but the partition will
still be recreated.
Change to a single thenCompose chain, and directly return a failed future
when not allowed?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java:
##########
@@ -149,6 +149,13 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws
Exception {
});
waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
+ try {
+ admin2.topics().createMissedPartitions(topicName);
Review Comment:
This test currently only asserts that the admin call throws an exception,
but does not assert that the partition still does not exist after the failure.
Since "request rejected" and "subsequent continue tryCreatePartitionsAsync" are
two separate paths in the implementation, relying solely on the exception
message cannot prevent false positives where "the response fails but the side
effect has already occurred". It is recommended to add another assertion to
confirm that topicName-partition-0 has not been recreated, or that the
corresponding replicator remains stopped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]