aleks-lazic opened a new pull request, #1476: URL: https://github.com/apache/pulsar-client-go/pull/1476
### Motivation
`MESSAGE` and `ACTIVE_CONSUMER_CHANGE` frames sent by the broker
immediately after a successful subscribe RPC are silently dropped. The client
logs `Consumer not found while active consumer change` and `Got unexpected
message`, but the frames are permanently lost.
This happens because `grabConn()` calls `AddConsumeHandler` **after** the
subscribe RPC returns. The broker starts delivering frames as soon as the
subscribe succeeds, but the connection's read goroutine cannot find the handler
yet and discards them.
This is a correctness hazard for consumers using `AckCumulative`: a later
message acknowledged cumulatively can implicitly acknowledge the dropped
message before the application ever processes it — permanent silent message
loss.
### Modifications
Split `RequestWithCnxKeySuffix` (which is internally `GetConnection` +
`RequestOnCnx`) into its two constituent operations and insert
`AddConsumeHandler` in between, so the handler is registered before the broker
can send any frames.
On subscribe failure, `DeleteConsumeHandler` cleans up the pre-registered
handler. The timeout path sends `CloseConsumer` via `RequestOnCnx` on the same
connection.
This mirrors the existing pattern in `producer_partition.go` which already
does `GetConnection` → `RegisterListener` → `RequestOnCnx`.
### Verifying this change
This change added tests and can be verified as follows:
- `TestGrabConn_HandlerRegisteredBeforeSubscribe` — handler is in the map
before the subscribe RPC
- `TestGrabConn_HandlerRemovedOnSubscribeFailure` — no handler leak on
error
- `TestGrabConn_HandlerRemovedOnSubscribeTimeout` — cleanup on timeout,
close sent on same connection
- `TestGrabConn_BrokerFrameDuringSubscribe` — broker frame arriving
mid-RPC reaches the consumer
- `TestGrabConn_GetConnectionFailure` — early return, no handler
registered
- `TestGrabConn_AddConsumeHandlerFailure` — early return, no RPC sent
### Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API: no
- The schema: no
- The default values of configurations: no
- The wire protocol: no
### Documentation
- Does this pull request introduce a new feature? no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
