This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d7fafb5a feat: regard ProducerBlockedQuotaExceededException as
retryable exception to continue to reconnect (#1457)
d7fafb5a is described below
commit d7fafb5a559a1bcd744dff2c3fb2493884621bbd
Author: zhou zhuohan <[email protected]>
AuthorDate: Fri Mar 20 16:43:46 2026 +0800
feat: regard ProducerBlockedQuotaExceededException as retryable exception
to continue to reconnect (#1457)
### Motivation
Currently, the Go client handles connections in two ways:
1. If the current connection is an initialization connection, meaning when
the producer is initialized for the first time, the `grabCnx()`method will not
trigger retry logic. If an error occurs, it will be directly returned to the
caller.
2. If the current connection is an already active connection, the
`runEventsLoop()` method will detect the disconnection and call the
`reconnectToBroker()` method to re-establish the connection. During
reconnection, the method identifies whether the current exception is a
retryable error. If it is a retryable error, the `internal.Retry()` inside the
`reconnectToBroker()` method will retry indefinitely based on the backoff
strategy. If it is a non-retryable error, such as the current ` [...]
Since the `backlogQuotaExceedException` can be resolved by modifying the
TTL or increasing consumption speed, it is likely to recover after some retry
attempts. I believe the exception handling for
`errMsgProducerBlockedQuotaExceededException` in the `reconnectToBroker()`
method should align as closely as possible with the Java implementation. Java’s
current retry logic for connections is as follows:
1. If an error is received after sending a message or a heartbeat times
out, the callback function `handleSendError()` is triggered to close the
connection.
2. When the connection is closed, the `connectionClosed` method is called,
which in turn invokes the `grabCnx()` method, and subsequently `grabCnx()`
calls the `connectionOpened()` method to reconnect:
```Java
// Schedule a reconnection task
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", state.topic,
state.getHandlerName());
grabCnx(); // Re-establish the connection
}, delayMs, TimeUnit.MILLISECONDS);
```
3. If the connection still fails in the `connectionOpened()` method, the
client will execute different logic based on the current exception. For the
`ProducerBlockedQuotaExceededException` exception, the Java client does not
treat it as an `isUnrecoverableError`. Instead, it first cleans up the pending
messages and then attempts to reconnect.
### Modifications
1. To maintain consistency with the Java client, I suggest that when
`errMsgProducerBlockedQuotaExceededException` is encountered in
`reconnectToBroker()`, only `failPendingMessages()` should be executed, but the
connection should not be closed. Specifically, the code should be modified as
follows:
```Go
if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing
pending messages, stop reconnecting")
p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
// Do not close the connection here
// return struct{}{}, err
}
```
2. Update the golangci-lint version in the Makefile from v1.61.0 to
v1.64.2, which already meets the minimum requirement for Golang 1.24.
---
Makefile | 4 +-
pulsar/producer_partition.go | 5 +-
pulsar/producer_test.go | 135 +++++++++++++++++++++++++++++++++++++++++++
3 files changed, 140 insertions(+), 4 deletions(-)
diff --git a/Makefile b/Makefile
index 06e18025..bee9902a 100644
--- a/Makefile
+++ b/Makefile
@@ -44,13 +44,13 @@ lint: bin/golangci-lint
bin/golangci-lint run
bin/golangci-lint:
- GOBIN=$(shell pwd)/bin go install
github.com/golangci/golangci-lint/cmd/[email protected]
+ GOBIN=$(shell pwd)/bin go install
github.com/golangci/golangci-lint/cmd/[email protected]
# an alternative to above `make lint` command
# use golangCi-lint docker to avoid local golang env issues
# https://golangci-lint.run/welcome/install/
lint-docker:
- docker run --rm -v $(shell pwd):/app -w /app
golangci/golangci-lint:v1.61.0 golangci-lint run -v
+ docker run --rm -v $(shell pwd):/app -w /app
golangci/golangci-lint:v1.64.2 golangci-lint run -v
container:
docker build -t ${IMAGE_NAME} \
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 06021eea..1e0361bc 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -523,9 +523,10 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
}
if strings.Contains(errMsg,
errMsgProducerBlockedQuotaExceededException) {
- p.log.Warn("Producer was blocked by quota exceed
exception, failing pending messages, stop reconnecting")
+ // ProducerBlockedQuotaExceededException is a
retryable exception,
+ // we only fail pending messages but continue
trying to reconnect
+ p.log.Warn("Producer was blocked by quota exceed
exception, failing pending messages, will retry reconnecting")
p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
- return struct{}{}, nil
}
if strings.Contains(errMsg, errMsgProducerFenced) {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 30805942..529511a0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
+ "log/slog"
"net/http"
"os"
"strconv"
@@ -2952,3 +2953,137 @@ func TestPartitionUpdateFailed(t *testing.T) {
time.Sleep(time.Second * 1)
}
}
+
+type testReconnectBackoffPolicy struct {
+ curBackoff, minBackoff, maxBackoff time.Duration
+ retryTime int
+ lock sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration)
*testReconnectBackoffPolicy {
+ return &testReconnectBackoffPolicy{
+ curBackoff: 0,
+ minBackoff: minBackoff,
+ maxBackoff: maxBackoff,
+ }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ // Double the delay each time
+ b.curBackoff += b.curBackoff
+ if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+ b.curBackoff = b.minBackoff
+ } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+ b.curBackoff = b.maxBackoff
+ }
+ b.retryTime++
+ return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+ return true
+}
+
+func TestProducerReconnectWhenBacklogQuotaExceed(t *testing.T) {
+ logger := slog.New(slog.NewJSONHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelInfo}))
+ slog.SetDefault(logger)
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ Logger: plog.NewLoggerWithSlog(logger),
+ })
+ defer client.Close()
+ namespace := "public/" + generateRandomName()
+ assert.NoError(t, err)
+ admin, err := pulsaradmin.NewClient(&config.Config{
+ WebServiceURL: adminURL,
+ })
+ assert.NoError(t, err)
+ // Step 1: Create namespace and configure 10KB backlog quota with
producer_exception policy
+ // When subscription backlog stats refresh and reach the limit,
producer will encounter BlockQuotaExceed exception
+ err = admin.Namespaces().CreateNamespace(namespace)
+ assert.NoError(t, err)
+ err = admin.Namespaces().SetBacklogQuota(
+ namespace,
+ utils.NewBacklogQuota(10*1024, -1, utils.ProducerException),
+ utils.DestinationStorage,
+ )
+ assert.NoError(t, err)
+
+ // Verify backlog quota configuration
+ quotaMap, err := admin.Namespaces().GetBacklogQuotaMap(namespace)
+ assert.NoError(t, err)
+ logger.Info(fmt.Sprintf("quotaMap = %v", quotaMap))
+
+ // Create test topic
+ topicName := namespace + "/test-topic"
+ tn, err := utils.GetTopicName(topicName)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*tn, 1)
+ assert.NoError(t, err)
+
+ // Step 2: Create consumer with small receiver queue size and earliest
subscription position
+ // This ensures that by sending a 512KB message (much larger than the
10KB backlog quota),
+ // the producer will quickly reach the backlog quota limit
+ _consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ ReceiverQueueSize: 1,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer _consumer.Close()
+
+ // Step 3: Create producer with custom backoff policy to reduce retry
interval
+ bo := newTestReconnectBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: true,
+ SendTimeout: 5 * time.Minute,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return bo
+ },
+ })
+ assert.NoError(t, err)
+ defer _producer.Close()
+
+ // Step 4: Send 512KB messages and monitor statistics
+ // Limit to 10 iterations to avoid infinite loop in test
+ isReachMaxBackoff := false
+ for i := 0; i < 10; i++ {
+ _producer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 512*1024),
+ }, func(msgId MessageID, _ *ProducerMessage, err error) {
+ if err != nil {
+ logger.Error("sendAsync fail", "time",
time.Now().String(), "err", err.Error())
+ return
+ }
+ logger.Info("sendAsync success", "msgId",
msgId.String(), "time", time.Now().String())
+ })
+
+ // Get topic statistics for debugging
+ stats, err := admin.Topics().GetPartitionedStats(*tn, false)
+ assert.NoError(t, err)
+ logger.Info("current backlogSize", "backlogSize",
stats.Subscriptions["my-sub"].BacklogSize)
+ if bo.IsMaxBackoffReached() {
+ isReachMaxBackoff = true
+ break
+ }
+ time.Sleep(10 * time.Second)
+ }
+
+ // Step 5: Verify that backoff mechanism reaches maximum retry limit
+ // This indicates that producer successfully detected backlog quota
limit and triggered reconnection mechanism
+ assert.True(t, isReachMaxBackoff)
+}