This is an automated email from the ASF dual-hosted git repository.

Jason918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fe5799e [Issue #1458] Reduce unnecessary creation of retry topics and 
DLQ topics (#1459)
1fe5799e is described below

commit 1fe5799e67a7d9bd4b7523649d56146619d8bfb6
Author: zhenJiangWang <[email protected]>
AuthorDate: Thu Jun 11 12:00:04 2026 +0800

    [Issue #1458] Reduce unnecessary creation of retry topics and DLQ topics 
(#1459)
    
    * [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
    
    * [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
    
    * Update pulsar/consumer_impl.go
    
    Co-authored-by: Copilot <[email protected]>
    
    * [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
    
    * [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
    
    * fix lint
    
    * [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
    
    ---------
    
    Co-authored-by: zjxxzjwang <[email protected]>
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: crossoverJie <[email protected]>
---
 pulsar/consumer_impl.go | 47 ++++++++++++++++++----------------
 pulsar/consumer_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 93 insertions(+), 21 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5848b93c..ffca996d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -156,32 +156,37 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + RetryTopicSuffix
                oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + DlqTopicSuffix
 
-               if r, err := 
client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
-                       r != nil &&
-                       r.Partitions > 0 {
-                       retryTopic = oldRetryTopic
-               }
-
-               if r, err := 
client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
-                       r != nil &&
-                       r.Partitions > 0 {
-                       dlqTopic = oldDlqTopic
+               // Check for old topic naming format.
+               // When DLQ policy is not provided, check both old topics for 
backward compatibility.
+               checkTopicIsExists := func(topic string) bool {
+                       r, err := 
client.lookupService.GetPartitionedTopicMetadata(topic)
+                       return err == nil && r != nil && r.Partitions > 0
+               }
+               resolveTopic := func(current, old, defaultTopic string) string {
+                       if current != "" {
+                               return current
+                       }
+                       if checkTopicIsExists(old) {
+                               return old
+                       }
+                       return defaultTopic
                }
-
                if options.DLQ == nil {
                        options.DLQ = &DLQPolicy{
-                               MaxDeliveries:    MaxReconsumeTimes,
-                               DeadLetterTopic:  dlqTopic,
-                               RetryLetterTopic: retryTopic,
-                       }
-               } else {
-                       if options.DLQ.DeadLetterTopic == "" {
-                               options.DLQ.DeadLetterTopic = dlqTopic
-                       }
-                       if options.DLQ.RetryLetterTopic == "" {
-                               options.DLQ.RetryLetterTopic = retryTopic
+                               MaxDeliveries: MaxReconsumeTimes,
                        }
                }
+               options.DLQ.DeadLetterTopic = resolveTopic(
+                       options.DLQ.DeadLetterTopic,
+                       oldDlqTopic,
+                       dlqTopic,
+               )
+               options.DLQ.RetryLetterTopic = resolveTopic(
+                       options.DLQ.RetryLetterTopic,
+                       oldRetryTopic,
+                       retryTopic,
+               )
+
                if options.Topic != "" && len(options.Topics) == 0 {
                        options.Topics = []string{options.Topic, 
options.DLQ.RetryLetterTopic}
                        options.Topic = ""
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index c3fc82d4..15fb3245 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -6441,3 +6441,70 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
                })
        }
 }
+
+// lookupServiceWrapper embeds the original LookupService and only overrides
+// GetPartitionedTopicMetadata to record the topics that were queried.
+type lookupServiceWrapper struct {
+       internal.LookupService
+       mu           sync.Mutex
+       calledTopics []string
+}
+
+func (w *lookupServiceWrapper) GetPartitionedTopicMetadata(topic string) 
(*internal.PartitionedTopicMetadata, error) {
+       w.mu.Lock()
+       w.calledTopics = append(w.calledTopics, topic)
+       w.mu.Unlock()
+       return &internal.PartitionedTopicMetadata{Partitions: 0}, nil
+}
+
+// TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata verifies that 
when custom DLQ and Retry
+// topics are provided in DLQPolicy, the resolveTopic function should NOT call 
GetPartitionedTopicMetadata
+// to check old-format DLQ/Retry topics. This ensures the optimization path 
works correctly.
+func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) {
+       // Create a real client with a short operation timeout so that the 
subsequent
+       // consumer creation (which requires a broker connection) fails quickly.
+       c, err := NewClient(ClientOptions{
+               URL:                     serviceURL,
+               MaxConnectionsPerBroker: 10,
+               OperationTimeout:        1 * time.Second,
+       })
+       assert.NoError(t, err)
+       defer c.Close()
+
+       // Replace the client's internal lookupService with our wrapper to 
intercept
+       // and record all GetPartitionedTopicMetadata calls.
+       realClient := c.(*client)
+       wrapper := &lookupServiceWrapper{LookupService: 
realClient.lookupService}
+       realClient.lookupService = wrapper
+
+       // Subscribe with custom DLQ and Retry topics specified.
+       // The Subscribe call will fail due to no broker connection, but the
+       // resolveTopic logic executes before the connection attempt.
+       _, _ = c.Subscribe(ConsumerOptions{
+               Topic:            "persistent://public/default/test-topic",
+               SubscriptionName: "test-subscription",
+               RetryEnable:      true,
+               DLQ: &DLQPolicy{
+                       MaxDeliveries:    3,
+                       DeadLetterTopic:  
"persistent://public/default/my-dlq-topic",
+                       RetryLetterTopic: 
"persistent://public/default/my-retry-topic",
+               },
+       })
+
+       // These are the old-format topics that resolveTopic would check via
+       // GetPartitionedTopicMetadata if no custom topics were provided.
+       oldDlqTopic := "persistent://public/default/test-subscription" + 
DlqTopicSuffix
+       oldRetryTopic := "persistent://public/default/test-subscription" + 
RetryTopicSuffix
+
+       // Verify that GetPartitionedTopicMetadata was never called with 
old-format topics.
+       // When custom DLQ/Retry topics are provided, resolveTopic should 
return them directly
+       // without checking whether old-format topics exist.
+       wrapper.mu.Lock()
+       defer wrapper.mu.Unlock()
+       for _, topic := range wrapper.calledTopics {
+               assert.NotEqual(t, oldDlqTopic, topic,
+                       "GetPartitionedTopicMetadata should not be called with 
old DLQ topic when custom DLQ topic is provided")
+               assert.NotEqual(t, oldRetryTopic, topic,
+                       "GetPartitionedTopicMetadata should not be called with 
old Retry topic when custom Retry topic is provided")
+       }
+}

Reply via email to