aleks-lazic opened a new issue, #1473:
URL: https://github.com/apache/pulsar-client-go/issues/1473

   ### Expected behavior
   
   A failover consumer should receive all `ActiveConsumerChange` notifications 
and all messages published after the subscribe RPC succeeds. When a consumer 
subscribes to a failover subscription, the broker delivers messages and state 
changes to it — the Go client should route them to the consumer's 
`MessageChannel` and event listener.
   
   ### Actual behavior
   
   `MESSAGE` and `ACTIVE_CONSUMER_CHANGE` commands sent by the broker 
immediately after the subscribe RPC succeeds are silently dropped by the Go 
client. The client logs:
   ```
   WARN  Consumer not found while active consumer change  consumerID=N
   WARN  Got unexpected message                           consumerID=N
   ```
   The consumer is subscribed and active from the broker's perspective — the 
broker correctly delivers the commands. But the Go client has not yet 
registered the consumer handler in its internal map, so `handleMessage` 
(connection.go:779-789) and `handleActiveConsumerChange` 
(connection.go:931-935) cannot find the consumer and discard the commands.
   
   This race exists on **every** subscribe call — initial or resubscription. 
Any `ActiveConsumerChange` or `MESSAGE` command delivered by the broker during 
the registration window is dropped. In practice the race is most reliably 
triggered during resubscription, because the broker delivers state immediately 
on a connection it has already been communicating on. But the underlying window 
exists unconditionally.
   
   While the broker will eventually redeliver unacked messages, this creates a 
serious correctness hazard for consumers using `AckCumulative`. A later message 
may be acknowledged cumulatively, implicitly acknowledging the dropped message 
before the application ever processes it. The message is then permanently lost 
from the application's perspective — with no error, no warning, and no way to 
detect it. Cumulative acknowledgment is the natural pattern for failover 
consumers, making this the most likely production scenario to be affected.
   
   ### Steps to reproduce
   
   The race is reliably triggered when a failover consumer closes and 
resubscribes — a normal operational pattern when reacting to `BecameInactive`. 
The broker sends `ActiveConsumerChange` on the new connection immediately, 
racing with `AddConsumeHandler`.
   
   The following log trace captures a real occurrence. The key sequence:
   
   1. Consumer 1 subscribes and enters `runLoop` (`has_persister=false`, not 
yet active)
   2. A price update is published and delivered by the broker
   3. Consumer 1 receives `BecameInactive` (`msg_count=1`) — the message was
      received but the persister wasn't ready, so it could not be processed.
      Consumer 1 is closed and the service resubscribes.
   4. Consumer 3 is created. The broker sends `ActiveConsumerChange` 
immediately —
      **before `AddConsumeHandler` is called**. The notification is dropped.
   5. `BecameActive` finally arrives again ~3 seconds later (broker retransmit).
      The persister starts, but the original message was already consumed by
      consumer 1, never processed, and silently lost.
   
   ### Root cause
   
   This is a race condition between the connection's read goroutine (which 
dispatches incoming frames) and the consumer registration via 
`AddConsumeHandler`.
   
   In `consumer_partition.go`, `grabConn()` registers the consumer handler 
**after** the subscribe RPC returns:
   ```go
   // Line 2053: subscribe RPC blocks until broker responds
   res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(...)
   
   // Lines 2074-2076: handler registered AFTER subscribe completes
   pc._setConn(res.Cnx)
   pc.log.Info("Connected consumer")
   err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)  // ← too late
   ```
   
   The broker sends `ACTIVE_CONSUMER_CHANGE` and `MESSAGE` commands on the same 
connection immediately after the subscribe RPC succeeds. These are processed by 
the connection's read goroutine, which looks up the consumer handler by ID. 
Since `AddConsumeHandler` hasn't been called yet, the lookup fails and the 
commands are discarded.
   
   ### Proposed fix
   
   Register the consumer handler **before** the subscribe RPC by obtaining the 
connection separately via `cnxPool.GetConnection`, calling `AddConsumeHandler`, 
then sending the RPC via `RequestOnCnx` on the same connection. On failure, 
clean up with `DeleteConsumeHandler`. This is safe because the 
`partitionConsumer` is fully constructed before `grabConn` is called.
   ```go
   // Obtain connection before sending the RPC
   cnx, err := pc.client.cnxPool.GetConnection(...)
   if err != nil {
       return err
   }
   
   // Register handler BEFORE the subscribe RPC so no frames are missed
   cnx.AddConsumeHandler(pc.consumerID, pc)
   
   res, err := pc.client.rpcClient.RequestOnCnx(cnx, ...)
   if err != nil {
       cnx.DeleteConsumeHandler(pc.consumerID) // clean up on failure
       return err
   }
   
   pc._setConn(res.Cnx)
   ```
   
   ### System configuration
   
   | | |
   |---|---|
   | **Pulsar version** | 4.x |
   | **Client version** | pulsar-client-go v0.18.0 (also present on current 
master) |
   | **Go version** | 1.25 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to