PavelZeger commented on code in PR #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507#discussion_r3376077338
##########
pulsar/consumer_partition.go:
##########
@@ -1771,6 +1778,16 @@ func (pc *partitionConsumer)
SetRedirectedClusterURI(redirectedClusterURI string
pc.redirectedClusterURI = redirectedClusterURI
}
+func (pc *partitionConsumer) pause() {
+ pc.paused.Store(true)
+}
+
+func (pc *partitionConsumer) resume() {
+ if pc.paused.CompareAndSwap(true, false) {
+ pc.availablePermits.flowIfNeed()
+ }
Review Comment:
Agreed. `resume()` was going through `flowIfNeed()`, which only sends
permits once the owed count reaches half the queue size. That's fine for normal
running (we don't want a flow message for every single message), but it's wrong
for resume. If the broker has zero permits and we owe it fewer than that
threshold, resume sends nothing - and since no messages come in, the count
never climbs back up. The consumer just sits there stuck.
Java has the same gap. Its `resume()` calls `increaseAvailablePermits(cnx,
0)`, which checks the same threshold. So this fix actually makes the Go client
a little more correct than Java here. I've opened an issue to fix Java client
in addition to this enhancement: https://github.com/apache/pulsar/issues/25978.
I added a small `flush()` method that resume uses instead. It just sends
whatever we owe, no threshold check. Since it sends exactly the owed amount, it
can never send too much (which was the original PIP concern). It uses the same
compare-and-swap as the normal path so it won't race or double-send.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]