Copilot commented on code in PR #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507#discussion_r3377613245


##########
pulsar/consumer_partition.go:
##########
@@ -331,6 +338,24 @@ func (p *availablePermits) flowIfNeed() {
        }
 }
 
+func (p *availablePermits) flush() {
+       if p.pc.paused.Load() {
+               return
+       }
+
+       current := p.get()
+       if current > 0 {
+               if !p.permits.CompareAndSwap(current, 0) {
+                       return
+               }
+
+               p.pc.log.Debugf("flushing withheld permits=%d", current)
+               if err := p.pc.internalFlow(uint32(current)); err != nil {
+                       p.pc.log.WithError(err).Error("unable to send permits")
+               }
+       }
+}

Review Comment:
   `availablePermits.flush()` does a single `CompareAndSwap` attempt and 
returns on failure. Under concurrent `add()` activity this can exit without 
sending any permits even though `permits` remains > 0, which defeats the 
purpose of `resume()` (and can leave the consumer stuck if no further `add()` 
calls occur to reach the flow threshold). Make `flush()` retry the CAS until it 
either observes 0 permits or successfully swaps the current value to 0 and 
sends it.



##########
pulsar/consumer_test.go:
##########
@@ -6441,3 +6441,224 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
                })
        }
 }
+
+func drainUntilTimeout(t *testing.T, consumer Consumer, perMsgTimeout 
time.Duration) int {
+       t.Helper()
+       count := 0
+       for {
+               ctx, cancel := context.WithTimeout(context.Background(), 
perMsgTimeout)
+               msg, err := consumer.Receive(ctx)
+               cancel()
+               if err != nil {
+                       return count
+               }

Review Comment:
   `drainUntilTimeout` stops draining on *any* `Receive` error and treats it as 
the timeout condition. That can hide unexpected failures (e.g., consumer 
closed, connection errors) and make the pause/resume assertions misleading. 
Assert that the error is actually the expected context deadline before 
returning.



##########
pulsar/consumer_regex.go:
##########
@@ -348,6 +351,28 @@ func (c *regexConsumer) Name() string {
        return c.consumerName
 }
 
+func (c *regexConsumer) Pause() {
+       c.consumersLock.Lock()
+       defer c.consumersLock.Unlock()
+       c.paused.Store(true)
+       for _, con := range c.consumers {
+               con.Pause()
+       }
+}
+
+func (c *regexConsumer) Resume() {
+       c.consumersLock.Lock()
+       defer c.consumersLock.Unlock()
+       c.paused.Store(false)
+       for _, con := range c.consumers {
+               con.Resume()
+       }
+}

Review Comment:
   `regexConsumer.Pause/Resume` hold `consumersLock` while calling 
`con.Pause()` / `con.Resume()` on child consumers. `Resume()` can trigger 
`partitionConsumer.resume()` → `internalFlow()` (network I/O), so this can 
block topic discovery/unsubscribe work and increases deadlock risk if any child 
implementation calls back into `regexConsumer`. Snapshot the current consumers 
under the lock, then release the lock before invoking child methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to