aleks-lazic commented on code in PR #1476:
URL: https://github.com/apache/pulsar-client-go/pull/1476#discussion_r3055725343


##########
pulsar/consumer_partition.go:
##########
@@ -2181,18 +2181,35 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                cmdSubscribe.ForceTopicCreation = proto.Bool(false)
        }
 
-       res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, 
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
-               pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+       // Obtain the connection before sending the subscribe RPC so we can 
register
+       // the consumer handler before the broker starts delivering frames.
+       // This closes a race where MESSAGE and ACTIVE_CONSUMER_CHANGE commands
+       // arriving immediately after the subscribe response were silently 
dropped
+       // because AddConsumeHandler had not been called yet.
+       cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr, 
lr.PhysicalAddr, pc.cnxKeySuffix)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to get connection")
+               return err
+       }
+
+       // Register handler BEFORE the subscribe RPC so no frames are missed
+       err = cnx.AddConsumeHandler(pc.consumerID, pc)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to add consumer handler")
+               return err
+       }

Review Comment:
     Good catch — confirmed this is a real panic path. MessageReceived → 
discardCorruptedMessage → pc._getConn() will dereference nil if the broker 
delivers a frame before _setConn is called.
   
     Fix: Moved _setConn(cnx) before AddConsumeHandler, and on failure (either 
AddConsumeHandler or subscribe RPC), we restore the previous connection if one 
existed. The ordering is now:
   
     GetConnection → _setConn(cnx) → AddConsumeHandler → subscribe RPC
   
     Added TestGrabConn_MessageReceivedDuringSubscribe_NilConn which reproduces 
the exact panic: it creates a consumer with no prior connection, injects a 
MessageReceived call during the subscribe RPC, and asserts no panic. Also added 
newGrabConnTestConsumerNoConn helper that does not pre-set _setConn, simulating 
the real first-call path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to