Copilot commented on code in PR #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507#discussion_r3370589479


##########
pulsar/consumer_test.go:
##########
@@ -6441,3 +6441,234 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
                })
        }
 }
+
+func drainUntilTimeout(t *testing.T, consumer Consumer, perMsgTimeout 
time.Duration) int {
+       t.Helper()
+       count := 0
+       for {
+               ctx, cancel := context.WithTimeout(context.Background(), 
perMsgTimeout)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               if err != nil {
+                       return count
+               }
+
+               ackErr := consumer.Ack(msg)
+               if ackErr != nil {
+                       return 0
+               }

Review Comment:
   In `drainUntilTimeout`, an `Ack` error causes the helper to silently return 
`0`, which can hide real failures and make downstream assertions misleading. It 
should fail the test immediately on ack errors (similar to how receive errors 
are handled via the timeout).



##########
pulsar/consumer_test.go:
##########
@@ -6441,3 +6441,234 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
                })
        }
 }
+
+func drainUntilTimeout(t *testing.T, consumer Consumer, perMsgTimeout 
time.Duration) int {
+       t.Helper()
+       count := 0
+       for {
+               ctx, cancel := context.WithTimeout(context.Background(), 
perMsgTimeout)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               if err != nil {
+                       return count
+               }
+
+               ackErr := consumer.Ack(msg)
+               if ackErr != nil {
+                       return 0
+               }
+
+               count++
+       }
+}
+
+func drainExactly(t *testing.T, consumer Consumer, want int) {
+       t.Helper()
+       for i := 0; i < want; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               assert.Nil(t, err)
+               if err != nil {
+                       return
+               }
+
+               ackErr := consumer.Ack(msg)
+               if ackErr != nil {
+                       return
+               }

Review Comment:
   In `drainExactly`, `Ack` errors currently cause an early return without 
failing the test, which can mask issues in the pause/resume tests. This helper 
should require the ack to succeed so test failures are surfaced 
deterministically.



##########
pulsar/consumer_partition.go:
##########
@@ -1771,6 +1778,16 @@ func (pc *partitionConsumer) 
SetRedirectedClusterURI(redirectedClusterURI string
        pc.redirectedClusterURI = redirectedClusterURI
 }
 
+func (pc *partitionConsumer) pause() {
+       pc.paused.Store(true)
+}
+
+func (pc *partitionConsumer) resume() {
+       if pc.paused.CompareAndSwap(true, false) {
+               pc.availablePermits.flowIfNeed()
+       }

Review Comment:
   `resume()` only calls `availablePermits.flowIfNeed()`, which may be a no-op 
when the number of withheld permits is below the flow threshold (e.g., 
maxQueueSize=5 => threshold=2, withheld=1). In that case the broker may have 
zero permits and the consumer can remain stuck after `Resume()` because no 
further messages arrive to increase `availablePermits` to the threshold. 
`Resume()` should flush any positive withheld permits immediately (without the 
threshold gating), while still avoiding over-granting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to