Copilot commented on code in PR #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507#discussion_r3370589479
##########
pulsar/consumer_test.go:
##########
@@ -6441,3 +6441,234 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
})
}
}
+
+func drainUntilTimeout(t *testing.T, consumer Consumer, perMsgTimeout
time.Duration) int {
+ t.Helper()
+ count := 0
+ for {
+ ctx, cancel := context.WithTimeout(context.Background(),
perMsgTimeout)
+ msg, err := consumer.Receive(ctx)
+ cancel()
+ if err != nil {
+ return count
+ }
+
+ ackErr := consumer.Ack(msg)
+ if ackErr != nil {
+ return 0
+ }
Review Comment:
In `drainUntilTimeout`, an `Ack` error causes the helper to silently return
`0`, which can hide real failures and make downstream assertions misleading. It
should fail the test immediately on ack errors (similar to how receive errors
are handled via the timeout).
##########
pulsar/consumer_test.go:
##########
@@ -6441,3 +6441,234 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
})
}
}
+
+func drainUntilTimeout(t *testing.T, consumer Consumer, perMsgTimeout
time.Duration) int {
+ t.Helper()
+ count := 0
+ for {
+ ctx, cancel := context.WithTimeout(context.Background(),
perMsgTimeout)
+ msg, err := consumer.Receive(ctx)
+ cancel()
+ if err != nil {
+ return count
+ }
+
+ ackErr := consumer.Ack(msg)
+ if ackErr != nil {
+ return 0
+ }
+
+ count++
+ }
+}
+
+func drainExactly(t *testing.T, consumer Consumer, want int) {
+ t.Helper()
+ for i := 0; i < want; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
10*time.Second)
+ msg, err := consumer.Receive(ctx)
+ cancel()
+ assert.Nil(t, err)
+ if err != nil {
+ return
+ }
+
+ ackErr := consumer.Ack(msg)
+ if ackErr != nil {
+ return
+ }
Review Comment:
In `drainExactly`, `Ack` errors currently cause an early return without
failing the test, which can mask issues in the pause/resume tests. This helper
should require the ack to succeed so test failures are surfaced
deterministically.
##########
pulsar/consumer_partition.go:
##########
@@ -1771,6 +1778,16 @@ func (pc *partitionConsumer)
SetRedirectedClusterURI(redirectedClusterURI string
pc.redirectedClusterURI = redirectedClusterURI
}
+func (pc *partitionConsumer) pause() {
+ pc.paused.Store(true)
+}
+
+func (pc *partitionConsumer) resume() {
+ if pc.paused.CompareAndSwap(true, false) {
+ pc.availablePermits.flowIfNeed()
+ }
Review Comment:
`resume()` only calls `availablePermits.flowIfNeed()`, which may be a no-op
when the number of withheld permits is below the flow threshold (e.g.,
maxQueueSize=5 => threshold=2, withheld=1). In that case the broker may have
zero permits and the consumer can remain stuck after `Resume()` because no
further messages arrive to increase `availablePermits` to the threshold.
`Resume()` should flush any positive withheld permits immediately (without the
threshold gating), while still avoiding over-granting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]