aleks-lazic commented on code in PR #1476:
URL: https://github.com/apache/pulsar-client-go/pull/1476#discussion_r3055725343
##########
pulsar/consumer_partition.go:
##########
@@ -2181,18 +2181,35 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}
- res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
- pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+ // Obtain the connection before sending the subscribe RPC so we can
register
+ // the consumer handler before the broker starts delivering frames.
+ // This closes a race where MESSAGE and ACTIVE_CONSUMER_CHANGE commands
+ // arriving immediately after the subscribe response were silently
dropped
+ // because AddConsumeHandler had not been called yet.
+ cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to get connection")
+ return err
+ }
+
+ // Register handler BEFORE the subscribe RPC so no frames are missed
+ err = cnx.AddConsumeHandler(pc.consumerID, pc)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to add consumer handler")
+ return err
+ }
Review Comment:
Good catch — confirmed this is a real panic path. MessageReceived →
discardCorruptedMessage → pc._getConn() will dereference nil if the broker
delivers a frame before _setConn is called.
Fix: Moved _setConn(cnx) before AddConsumeHandler, and on failure (either
AddConsumeHandler or subscribe RPC), we restore the previous connection if one
existed. The ordering is now:
GetConnection → _setConn(cnx) → AddConsumeHandler → subscribe RPC
Added TestGrabConn_MessageReceivedDuringSubscribe_NilConn which reproduces
the exact panic: it creates a consumer with no prior connection, injects a
MessageReceived call during the subscribe RPC, and asserts no panic. Also added
newGrabConnTestConsumerNoConn helper that does not pre-set _setConn, simulating
the real first-call path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]