This is an automated email from the ASF dual-hosted git repository.
Jason918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1fe5799e [Issue #1458] Reduce unnecessary creation of retry topics and
DLQ topics (#1459)
1fe5799e is described below
commit 1fe5799e67a7d9bd4b7523649d56146619d8bfb6
Author: zhenJiangWang <[email protected]>
AuthorDate: Thu Jun 11 12:00:04 2026 +0800
[Issue #1458] Reduce unnecessary creation of retry topics and DLQ topics
(#1459)
* [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
* [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
* Update pulsar/consumer_impl.go
Co-authored-by: Copilot <[email protected]>
* [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
* [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
* fix lint
* [Issue #1458] educe unnecessary creation of retry topics and DLQ topics
---------
Co-authored-by: zjxxzjwang <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: crossoverJie <[email protected]>
---
pulsar/consumer_impl.go | 47 ++++++++++++++++++----------------
pulsar/consumer_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 93 insertions(+), 21 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5848b93c..ffca996d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -156,32 +156,37 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" +
options.SubscriptionName + RetryTopicSuffix
oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" +
options.SubscriptionName + DlqTopicSuffix
- if r, err :=
client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
- r != nil &&
- r.Partitions > 0 {
- retryTopic = oldRetryTopic
- }
-
- if r, err :=
client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
- r != nil &&
- r.Partitions > 0 {
- dlqTopic = oldDlqTopic
+ // Check for old topic naming format.
+ // When DLQ policy is not provided, check both old topics for
backward compatibility.
+ checkTopicIsExists := func(topic string) bool {
+ r, err :=
client.lookupService.GetPartitionedTopicMetadata(topic)
+ return err == nil && r != nil && r.Partitions > 0
+ }
+ resolveTopic := func(current, old, defaultTopic string) string {
+ if current != "" {
+ return current
+ }
+ if checkTopicIsExists(old) {
+ return old
+ }
+ return defaultTopic
}
-
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
- MaxDeliveries: MaxReconsumeTimes,
- DeadLetterTopic: dlqTopic,
- RetryLetterTopic: retryTopic,
- }
- } else {
- if options.DLQ.DeadLetterTopic == "" {
- options.DLQ.DeadLetterTopic = dlqTopic
- }
- if options.DLQ.RetryLetterTopic == "" {
- options.DLQ.RetryLetterTopic = retryTopic
+ MaxDeliveries: MaxReconsumeTimes,
}
}
+ options.DLQ.DeadLetterTopic = resolveTopic(
+ options.DLQ.DeadLetterTopic,
+ oldDlqTopic,
+ dlqTopic,
+ )
+ options.DLQ.RetryLetterTopic = resolveTopic(
+ options.DLQ.RetryLetterTopic,
+ oldRetryTopic,
+ retryTopic,
+ )
+
if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic,
options.DLQ.RetryLetterTopic}
options.Topic = ""
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index c3fc82d4..15fb3245 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -6441,3 +6441,70 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
})
}
}
+
+// lookupServiceWrapper embeds the original LookupService and only overrides
+// GetPartitionedTopicMetadata to record the topics that were queried.
+type lookupServiceWrapper struct {
+ internal.LookupService
+ mu sync.Mutex
+ calledTopics []string
+}
+
+func (w *lookupServiceWrapper) GetPartitionedTopicMetadata(topic string)
(*internal.PartitionedTopicMetadata, error) {
+ w.mu.Lock()
+ w.calledTopics = append(w.calledTopics, topic)
+ w.mu.Unlock()
+ return &internal.PartitionedTopicMetadata{Partitions: 0}, nil
+}
+
+// TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata verifies that
when custom DLQ and Retry
+// topics are provided in DLQPolicy, the resolveTopic function should NOT call
GetPartitionedTopicMetadata
+// to check old-format DLQ/Retry topics. This ensures the optimization path
works correctly.
+func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) {
+ // Create a real client with a short operation timeout so that the
subsequent
+ // consumer creation (which requires a broker connection) fails quickly.
+ c, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ MaxConnectionsPerBroker: 10,
+ OperationTimeout: 1 * time.Second,
+ })
+ assert.NoError(t, err)
+ defer c.Close()
+
+ // Replace the client's internal lookupService with our wrapper to
intercept
+ // and record all GetPartitionedTopicMetadata calls.
+ realClient := c.(*client)
+ wrapper := &lookupServiceWrapper{LookupService:
realClient.lookupService}
+ realClient.lookupService = wrapper
+
+ // Subscribe with custom DLQ and Retry topics specified.
+ // The Subscribe call will fail due to no broker connection, but the
+ // resolveTopic logic executes before the connection attempt.
+ _, _ = c.Subscribe(ConsumerOptions{
+ Topic: "persistent://public/default/test-topic",
+ SubscriptionName: "test-subscription",
+ RetryEnable: true,
+ DLQ: &DLQPolicy{
+ MaxDeliveries: 3,
+ DeadLetterTopic:
"persistent://public/default/my-dlq-topic",
+ RetryLetterTopic:
"persistent://public/default/my-retry-topic",
+ },
+ })
+
+ // These are the old-format topics that resolveTopic would check via
+ // GetPartitionedTopicMetadata if no custom topics were provided.
+ oldDlqTopic := "persistent://public/default/test-subscription" +
DlqTopicSuffix
+ oldRetryTopic := "persistent://public/default/test-subscription" +
RetryTopicSuffix
+
+ // Verify that GetPartitionedTopicMetadata was never called with
old-format topics.
+ // When custom DLQ/Retry topics are provided, resolveTopic should
return them directly
+ // without checking whether old-format topics exist.
+ wrapper.mu.Lock()
+ defer wrapper.mu.Unlock()
+ for _, topic := range wrapper.calledTopics {
+ assert.NotEqual(t, oldDlqTopic, topic,
+ "GetPartitionedTopicMetadata should not be called with
old DLQ topic when custom DLQ topic is provided")
+ assert.NotEqual(t, oldRetryTopic, topic,
+ "GetPartitionedTopicMetadata should not be called with
old Retry topic when custom Retry topic is provided")
+ }
+}