This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d7fafb5a feat: regard ProducerBlockedQuotaExceededException as 
retryable exception to continue to reconnect (#1457)
d7fafb5a is described below

commit d7fafb5a559a1bcd744dff2c3fb2493884621bbd
Author: zhou zhuohan <[email protected]>
AuthorDate: Fri Mar 20 16:43:46 2026 +0800

    feat: regard ProducerBlockedQuotaExceededException as retryable exception 
to continue to reconnect (#1457)
    
    
    ### Motivation
    Currently, the Go client handles connections in two ways:
    1. If the current connection is an initialization connection, meaning when 
the producer is initialized for the first time, the `grabCnx()`method will not 
trigger retry logic. If an error occurs, it will be directly returned to the 
caller.
    2. If the current connection is an already active connection, the 
`runEventsLoop()` method will detect the disconnection and call the 
`reconnectToBroker()` method to re-establish the connection. During 
reconnection, the method identifies whether the current exception is a 
retryable error. If it is a retryable error, the `internal.Retry()` inside the 
`reconnectToBroker()` method will retry indefinitely based on the backoff 
strategy. If it is a non-retryable error, such as the current ` [...]
    
    Since the `backlogQuotaExceedException` can be resolved by modifying the 
TTL or increasing consumption speed, it is likely to recover after some retry 
attempts. I believe the exception handling for 
`errMsgProducerBlockedQuotaExceededException` in the `reconnectToBroker()` 
method should align as closely as possible with the Java implementation. Java’s 
current retry logic for connections is as follows:
    1. If an error is received after sending a message or a heartbeat times 
out, the callback function `handleSendError()` is triggered to close the 
connection.
    2. When the connection is closed, the `connectionClosed` method is called, 
which in turn invokes the `grabCnx()` method, and subsequently `grabCnx()` 
calls the `connectionOpened()` method to reconnect:
    ```Java
    // Schedule a reconnection task
    state.client.timer().newTimeout(timeout -> {
        log.info("[{}] [{}] Reconnecting after timeout", state.topic, 
state.getHandlerName());
        grabCnx();  // Re-establish the connection
    }, delayMs, TimeUnit.MILLISECONDS);
    ```
    3. If the connection still fails in the `connectionOpened()` method, the 
client will execute different logic based on the current exception. For the 
`ProducerBlockedQuotaExceededException` exception, the Java client does not 
treat it as an `isUnrecoverableError`. Instead, it first cleans up the pending 
messages and then attempts to reconnect.
    
    ### Modifications
    1. To maintain consistency with the Java client, I suggest that when 
`errMsgProducerBlockedQuotaExceededException` is encountered in 
`reconnectToBroker()`, only `failPendingMessages()` should be executed, but the 
connection should not be closed. Specifically, the code should be modified as 
follows:
    ```Go
    if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
        p.log.Warn("Producer was blocked by quota exceed exception, failing 
pending messages, stop reconnecting")
        p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
        // Do not close the connection here
        // return struct{}{}, err
    }
    ```
    
    2. Update the golangci-lint version in the Makefile from v1.61.0 to 
v1.64.2, which already meets the minimum requirement for Golang 1.24.
---
 Makefile                     |   4 +-
 pulsar/producer_partition.go |   5 +-
 pulsar/producer_test.go      | 135 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 140 insertions(+), 4 deletions(-)

diff --git a/Makefile b/Makefile
index 06e18025..bee9902a 100644
--- a/Makefile
+++ b/Makefile
@@ -44,13 +44,13 @@ lint: bin/golangci-lint
        bin/golangci-lint run
 
 bin/golangci-lint:
-       GOBIN=$(shell pwd)/bin go install 
github.com/golangci/golangci-lint/cmd/[email protected]
+       GOBIN=$(shell pwd)/bin go install 
github.com/golangci/golangci-lint/cmd/[email protected]
 
 # an alternative to above `make lint` command
 # use golangCi-lint docker to avoid local golang env issues
 # https://golangci-lint.run/welcome/install/
 lint-docker:
-       docker run --rm -v $(shell pwd):/app -w /app 
golangci/golangci-lint:v1.61.0 golangci-lint run -v
+       docker run --rm -v $(shell pwd):/app -w /app 
golangci/golangci-lint:v1.64.2 golangci-lint run -v
 
 container:
        docker build -t ${IMAGE_NAME} \
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 06021eea..1e0361bc 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -523,9 +523,10 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                }
 
                if strings.Contains(errMsg, 
errMsgProducerBlockedQuotaExceededException) {
-                       p.log.Warn("Producer was blocked by quota exceed 
exception, failing pending messages, stop reconnecting")
+                       //      ProducerBlockedQuotaExceededException is a 
retryable exception,
+                       //      we only fail pending messages but continue 
trying to reconnect
+                       p.log.Warn("Producer was blocked by quota exceed 
exception, failing pending messages, will retry reconnecting")
                        
p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
-                       return struct{}{}, nil
                }
 
                if strings.Contains(errMsg, errMsgProducerFenced) {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 30805942..529511a0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -21,6 +21,7 @@ import (
        "context"
        "errors"
        "fmt"
+       "log/slog"
        "net/http"
        "os"
        "strconv"
@@ -2952,3 +2953,137 @@ func TestPartitionUpdateFailed(t *testing.T) {
                time.Sleep(time.Second * 1)
        }
 }
+
+type testReconnectBackoffPolicy struct {
+       curBackoff, minBackoff, maxBackoff time.Duration
+       retryTime                          int
+       lock                               sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration) 
*testReconnectBackoffPolicy {
+       return &testReconnectBackoffPolicy{
+               curBackoff: 0,
+               minBackoff: minBackoff,
+               maxBackoff: maxBackoff,
+       }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+
+       // Double the delay each time
+       b.curBackoff += b.curBackoff
+       if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+               b.curBackoff = b.minBackoff
+       } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+               b.curBackoff = b.maxBackoff
+       }
+       b.retryTime++
+       return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+       return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+       return true
+}
+
+func TestProducerReconnectWhenBacklogQuotaExceed(t *testing.T) {
+       logger := slog.New(slog.NewJSONHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelInfo}))
+       slog.SetDefault(logger)
+       client, err := NewClient(ClientOptions{
+               URL:    serviceURL,
+               Logger: plog.NewLoggerWithSlog(logger),
+       })
+       defer client.Close()
+       namespace := "public/" + generateRandomName()
+       assert.NoError(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminURL,
+       })
+       assert.NoError(t, err)
+       // Step 1: Create namespace and configure 10KB backlog quota with 
producer_exception policy
+       // When subscription backlog stats refresh and reach the limit, 
producer will encounter BlockQuotaExceed exception
+       err = admin.Namespaces().CreateNamespace(namespace)
+       assert.NoError(t, err)
+       err = admin.Namespaces().SetBacklogQuota(
+               namespace,
+               utils.NewBacklogQuota(10*1024, -1, utils.ProducerException),
+               utils.DestinationStorage,
+       )
+       assert.NoError(t, err)
+
+       // Verify backlog quota configuration
+       quotaMap, err := admin.Namespaces().GetBacklogQuotaMap(namespace)
+       assert.NoError(t, err)
+       logger.Info(fmt.Sprintf("quotaMap = %v", quotaMap))
+
+       // Create test topic
+       topicName := namespace + "/test-topic"
+       tn, err := utils.GetTopicName(topicName)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*tn, 1)
+       assert.NoError(t, err)
+
+       // Step 2: Create consumer with small receiver queue size and earliest 
subscription position
+       // This ensures that by sending a 512KB message (much larger than the 
10KB backlog quota),
+       // the producer will quickly reach the backlog quota limit
+       _consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topicName,
+               SubscriptionName:            "my-sub",
+               Type:                        Exclusive,
+               ReceiverQueueSize:           1,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer _consumer.Close()
+
+       // Step 3: Create producer with custom backoff policy to reduce retry 
interval
+       bo := newTestReconnectBackoffPolicy(100*time.Millisecond, 1*time.Second)
+       _producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: true,
+               SendTimeout:     5 * time.Minute,
+               BackOffPolicyFunc: func() backoff.Policy {
+                       return bo
+               },
+       })
+       assert.NoError(t, err)
+       defer _producer.Close()
+
+       // Step 4: Send 512KB messages and monitor statistics
+       // Limit to 10 iterations to avoid infinite loop in test
+       isReachMaxBackoff := false
+       for i := 0; i < 10; i++ {
+               _producer.SendAsync(context.Background(), &ProducerMessage{
+                       Payload: make([]byte, 512*1024),
+               }, func(msgId MessageID, _ *ProducerMessage, err error) {
+                       if err != nil {
+                               logger.Error("sendAsync fail", "time", 
time.Now().String(), "err", err.Error())
+                               return
+                       }
+                       logger.Info("sendAsync success", "msgId", 
msgId.String(), "time", time.Now().String())
+               })
+
+               // Get topic statistics for debugging
+               stats, err := admin.Topics().GetPartitionedStats(*tn, false)
+               assert.NoError(t, err)
+               logger.Info("current backlogSize", "backlogSize", 
stats.Subscriptions["my-sub"].BacklogSize)
+               if bo.IsMaxBackoffReached() {
+                       isReachMaxBackoff = true
+                       break
+               }
+               time.Sleep(10 * time.Second)
+       }
+
+       // Step 5: Verify that backoff mechanism reaches maximum retry limit
+       // This indicates that producer successfully detected backlog quota 
limit and triggered reconnection mechanism
+       assert.True(t, isReachMaxBackoff)
+}

Reply via email to