PavelZeger commented on code in PR #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507#discussion_r3403447489


##########
pulsar/consumer_regex.go:
##########
@@ -348,6 +351,28 @@ func (c *regexConsumer) Name() string {
        return c.consumerName
 }
 
+func (c *regexConsumer) Pause() {
+       c.consumersLock.Lock()
+       defer c.consumersLock.Unlock()
+       c.paused.Store(true)
+       for _, con := range c.consumers {
+               con.Pause()
+       }
+}
+
+func (c *regexConsumer) Resume() {
+       c.consumersLock.Lock()
+       defer c.consumersLock.Unlock()
+       c.paused.Store(false)
+       for _, con := range c.consumers {
+               con.Resume()
+       }
+}

Review Comment:
   Fixed. I now snapshot the child consumers under `consumersLock`, release the 
lock, and then call `Pause()`/`Resume()` on each. One clarification: this isn't 
actually a deadlock risk. The child `resume()` never calls back into 
`regexConsumer`, so there's no lock cycle, and internalFlow isn't a blocking 
network round-trip - it's a send to a buffered channel that only blocks under 
write backpressure. So the real benefit here is avoiding lock contention with 
topic discovery (subscribe/unsubscribe take the same lock), not preventing a 
deadlock. The snapshot approach is safe because c.paused is still set under the 
lock before the snapshot, so any consumer added concurrently by discovery still 
reads the correct paused state.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to