aleks-lazic opened a new issue, #1473:
URL: https://github.com/apache/pulsar-client-go/issues/1473
### Expected behavior
A failover consumer should receive all `ActiveConsumerChange` notifications
and all messages published after the subscribe RPC succeeds. When a consumer
subscribes to a failover subscription, the broker delivers messages and state
changes to it — the Go client should route them to the consumer's
`MessageChannel` and event listener.
### Actual behavior
`MESSAGE` and `ACTIVE_CONSUMER_CHANGE` commands sent by the broker
immediately after the subscribe RPC succeeds are silently dropped by the Go
client. The client logs:
```
WARN Consumer not found while active consumer change consumerID=N
WARN Got unexpected message consumerID=N
```
The consumer is subscribed and active from the broker's perspective — the
broker correctly delivers the commands. But the Go client has not yet
registered the consumer handler in its internal map, so `handleMessage`
(connection.go:779-789) and `handleActiveConsumerChange`
(connection.go:931-935) cannot find the consumer and discard the commands.
This race exists on **every** subscribe call — initial or resubscription.
Any `ActiveConsumerChange` or `MESSAGE` command delivered by the broker during
the registration window is dropped. In practice the race is most reliably
triggered during resubscription, because the broker delivers state immediately
on a connection it has already been communicating on. But the underlying window
exists unconditionally.
While the broker will eventually redeliver unacked messages, this creates a
serious correctness hazard for consumers using `AckCumulative`. A later message
may be acknowledged cumulatively, implicitly acknowledging the dropped message
before the application ever processes it. The message is then permanently lost
from the application's perspective — with no error, no warning, and no way to
detect it. Cumulative acknowledgment is the natural pattern for failover
consumers, making this the most likely production scenario to be affected.
### Steps to reproduce
The race is reliably triggered when a failover consumer closes and
resubscribes — a normal operational pattern when reacting to `BecameInactive`.
The broker sends `ActiveConsumerChange` on the new connection immediately,
racing with `AddConsumeHandler`.
The following log trace captures a real occurrence. The key sequence:
1. Consumer 1 subscribes and enters `runLoop` (`has_persister=false`, not
yet active)
2. A price update is published and delivered by the broker
3. Consumer 1 receives `BecameInactive` (`msg_count=1`) — the message was
received but the persister wasn't ready, so it could not be processed.
Consumer 1 is closed and the service resubscribes.
4. Consumer 3 is created. The broker sends `ActiveConsumerChange`
immediately —
**before `AddConsumeHandler` is called**. The notification is dropped.
5. `BecameActive` finally arrives again ~3 seconds later (broker retransmit).
The persister starts, but the original message was already consumed by
consumer 1, never processed, and silently lost.
### Root cause
This is a race condition between the connection's read goroutine (which
dispatches incoming frames) and the consumer registration via
`AddConsumeHandler`.
In `consumer_partition.go`, `grabConn()` registers the consumer handler
**after** the subscribe RPC returns:
```go
// Line 2053: subscribe RPC blocks until broker responds
res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(...)
// Lines 2074-2076: handler registered AFTER subscribe completes
pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
err = pc._getConn().AddConsumeHandler(pc.consumerID, pc) // ← too late
```
The broker sends `ACTIVE_CONSUMER_CHANGE` and `MESSAGE` commands on the same
connection immediately after the subscribe RPC succeeds. These are processed by
the connection's read goroutine, which looks up the consumer handler by ID.
Since `AddConsumeHandler` hasn't been called yet, the lookup fails and the
commands are discarded.
### Proposed fix
Register the consumer handler **before** the subscribe RPC by obtaining the
connection separately via `cnxPool.GetConnection`, calling `AddConsumeHandler`,
then sending the RPC via `RequestOnCnx` on the same connection. On failure,
clean up with `DeleteConsumeHandler`. This is safe because the
`partitionConsumer` is fully constructed before `grabConn` is called.
```go
// Obtain connection before sending the RPC
cnx, err := pc.client.cnxPool.GetConnection(...)
if err != nil {
return err
}
// Register handler BEFORE the subscribe RPC so no frames are missed
cnx.AddConsumeHandler(pc.consumerID, pc)
res, err := pc.client.rpcClient.RequestOnCnx(cnx, ...)
if err != nil {
cnx.DeleteConsumeHandler(pc.consumerID) // clean up on failure
return err
}
pc._setConn(res.Cnx)
```
### System configuration
| | |
|---|---|
| **Pulsar version** | 4.x |
| **Client version** | pulsar-client-go v0.18.0 (also present on current
master) |
| **Go version** | 1.25 |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]