aleks-lazic opened a new pull request, #1476:
URL: https://github.com/apache/pulsar-client-go/pull/1476

     ### Motivation                                                             
                                                                                
                                                                                
                                                                                
          
      
     `MESSAGE` and `ACTIVE_CONSUMER_CHANGE` frames sent by the broker 
immediately after a successful subscribe RPC are silently dropped. The client 
logs `Consumer not found while active consumer change` and `Got unexpected 
message`, but the frames are permanently lost.                                  
                           
                                                               
     This happens because `grabConn()` calls `AddConsumeHandler` **after** the 
subscribe RPC returns. The broker starts delivering frames as soon as the 
subscribe succeeds, but the connection's read goroutine cannot find the handler 
yet and discards them.                                                          
                 
                                                               
     This is a correctness hazard for consumers using `AckCumulative`: a later 
message acknowledged cumulatively can implicitly acknowledge the dropped 
message before the application ever processes it — permanent silent message 
loss.                                                                           
                      
                                                               
     ### Modifications                                                          
                                                                                
                                                                                
                                                                                
          
                                                               
     Split `RequestWithCnxKeySuffix` (which is internally `GetConnection` + 
`RequestOnCnx`) into its two constituent operations and insert 
`AddConsumeHandler` in between, so the handler is registered before the broker 
can send any frames.                                                            
                                
      
     On subscribe failure, `DeleteConsumeHandler` cleans up the pre-registered 
handler. The timeout path sends `CloseConsumer` via `RequestOnCnx` on the same 
connection.                                                                     
                                                                                
            
                                                               
     This mirrors the existing pattern in `producer_partition.go` which already 
does `GetConnection` → `RegisterListener` → `RequestOnCnx`.                     
                                                                                
                                                                                
          
                                                               
     ### Verifying this change
   
     This change added tests and can be verified as follows:                    
                                                                                
                                                                                
                                                                                
          
      
     - `TestGrabConn_HandlerRegisteredBeforeSubscribe` — handler is in the map 
before the subscribe RPC                                                        
                                                                                
                                                                                
           
     - `TestGrabConn_HandlerRemovedOnSubscribeFailure` — no handler leak on 
error
     - `TestGrabConn_HandlerRemovedOnSubscribeTimeout` — cleanup on timeout, 
close sent on same connection                                                   
                                                                                
                                                                                
             
     - `TestGrabConn_BrokerFrameDuringSubscribe` — broker frame arriving 
mid-RPC reaches the consumer
     - `TestGrabConn_GetConnectionFailure` — early return, no handler 
registered                                                                      
                                                                                
                                                                                
                    
     - `TestGrabConn_AddConsumeHandlerFailure` — early return, no RPC sent      
                                                                                
                                                                                
                                                                                
          
                                                                                
                                                                                
                                                                                
                                                                                
          
     ### Does this pull request potentially affect one of the following parts:  
                                                                                
                                                                                
                                                                                
          
                                                               
     - Dependencies (does it add or upgrade a dependency): no                   
                                                                                
                                                                                
                                                                                
          
     - The public API: no
     - The schema: no                                                           
                                                                                
                                                                                
                                                                                
          
     - The default values of configurations: no                
     - The wire protocol: no
   
     ### Documentation
   
     - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to