aleks-lazic commented on code in PR #1476:
URL: https://github.com/apache/pulsar-client-go/pull/1476#discussion_r3055737081


##########
pulsar/consumer_partition.go:
##########
@@ -2181,18 +2181,35 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                cmdSubscribe.ForceTopicCreation = proto.Bool(false)
        }
 
-       res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, 
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
-               pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+       // Obtain the connection before sending the subscribe RPC so we can 
register
+       // the consumer handler before the broker starts delivering frames.
+       // This closes a race where MESSAGE and ACTIVE_CONSUMER_CHANGE commands
+       // arriving immediately after the subscribe response were silently 
dropped
+       // because AddConsumeHandler had not been called yet.
+       cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr, 
lr.PhysicalAddr, pc.cnxKeySuffix)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to get connection")
+               return err
+       }
+
+       // Register handler BEFORE the subscribe RPC so no frames are missed
+       err = cnx.AddConsumeHandler(pc.consumerID, pc)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to add consumer handler")
+               return err
+       }
 
+       res, err := pc.client.rpcClient.RequestOnCnx(cnx, requestID, 
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
        if err != nil {

Review Comment:
   Added TestGrabConn_MessageReceivedDuringSubscribe_NilConn in the latest 
commit — it injects a MessageReceived call with an invalid payload during 
RequestOnCnx, which exercises the MessageReceived → discardCorruptedMessage → 
pc._getConn() path and asserts no panic. This covers both the MESSAGE delivery 
race and the nil-conn hazard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to