aleks-lazic commented on code in PR #1476:
URL: https://github.com/apache/pulsar-client-go/pull/1476#discussion_r3055741749


##########
pulsar/consumer_partition_test.go:
##########
@@ -347,3 +352,267 @@ func TestMessageReceivedAllMessagesDuplicate(t 
*testing.T) {
        default:
        }
 }
+
+// TestGrabConn_HandlerRegisteredBeforeSubscribe verifies that the consumer
+// handler is registered on the connection BEFORE the subscribe RPC is sent.
+//
+// Without this ordering, the broker can send MESSAGE and 
ACTIVE_CONSUMER_CHANGE
+// frames immediately after the subscribe succeeds, but the client's read
+// goroutine cannot route them because the handler isn't in the map yet.
+// Those frames are silently dropped.
+func TestGrabConn_HandlerRegisteredBeforeSubscribe(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{cnx: cnx}
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.NoError(t, err)
+
+       assert.True(t, rpc.handlerRegisteredDuringRPC.Load(),
+               "AddConsumeHandler must be called before the subscribe RPC is 
sent")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeFailure verifies that when the
+// subscribe RPC fails, the pre-registered consumer handler is removed from
+// the connection so it does not leak.
+func TestGrabConn_HandlerRemovedOnSubscribeFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{
+               cnx:          cnx,
+               subscribeErr: fmt.Errorf("broker rejected subscribe"),
+       }
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.Error(t, err)
+
+       assert.True(t, cnx.handlerRemoved.Load(),
+               "DeleteConsumeHandler must be called when subscribe fails")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeTimeout verifies cleanup on timeout
+// and that the close command is sent on the same connection (not a potentially
+// different one from the pool).
+func TestGrabConn_HandlerRemovedOnSubscribeTimeout(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{
+               cnx:          cnx,
+               subscribeErr: internal.ErrRequestTimeOut,
+       }
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.ErrorIs(t, err, internal.ErrRequestTimeOut)
+
+       assert.True(t, cnx.handlerRemoved.Load(),
+               "DeleteConsumeHandler must be called on timeout")
+       assert.True(t, rpc.closeSentOnCnx.Load(),
+               "CloseConsumer must be sent via RequestOnCnx on the same 
connection")
+}
+
+// TestGrabConn_BrokerFrameDuringSubscribe simulates the exact race: the broker
+// sends a frame (e.g. ActiveConsumerChange) while the subscribe RPC is still
+// in flight. Because the handler is registered before the RPC, the frame
+// must be delivered to the consumer — not dropped.
+func TestGrabConn_BrokerFrameDuringSubscribe(t *testing.T) {
+       cnx := newSpyConnection()
+       var consumerReceivedChange atomic.Bool
+
+       rpc := &grabConnSpyRPCClient{
+               cnx: cnx,
+               duringSubscribe: func() {
+                       // Simulate the broker's read goroutine delivering a 
frame
+                       // while the subscribe RPC is in flight.
+                       if handler, ok := 
cnx.handler.Load().(*partitionConsumer); ok && handler != nil {
+                               handler.ActiveConsumerChanged(true)
+                               consumerReceivedChange.Store(true)
+                       }
+               },
+       }
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.NoError(t, err)
+
+       assert.True(t, consumerReceivedChange.Load(),
+               "Frames sent by the broker during the subscribe RPC must reach 
the consumer handler")
+}
+
+// TestGrabConn_GetConnectionFailure verifies that grabConn returns the error
+// from GetConnection without registering a handler or sending an RPC.
+func TestGrabConn_GetConnectionFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{cnx: cnx}
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       // Override the pool to return an error
+       pc.client.cnxPool = &grabConnMockPool{err: fmt.Errorf("connection 
refused")}
+
+       err := pc.grabConn("")
+       assert.ErrorContains(t, err, "connection refused")
+
+       assert.False(t, cnx.handlerRegistered.Load(),
+               "AddConsumeHandler must not be called when GetConnection fails")
+}
+
+// TestGrabConn_AddConsumeHandlerFailure verifies that grabConn returns the
+// error from AddConsumeHandler without sending a subscribe RPC.
+func TestGrabConn_AddConsumeHandlerFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       cnx.addHandlerErr = fmt.Errorf("connection closed")
+       rpc := &grabConnSpyRPCClient{cnx: cnx}
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.ErrorContains(t, err, "connection closed")
+
+       assert.False(t, rpc.handlerRegisteredDuringRPC.Load(),
+               "Subscribe RPC must not be sent when AddConsumeHandler fails")
+}
+
+// --- Helpers
+
+// newGrabConnTestConsumer builds a minimal partitionConsumer wired to the
+// given spy connection and RPC client, suitable for testing grabConn.
+func newGrabConnTestConsumer(cnx *spyConnection, rpc *grabConnSpyRPCClient) 
*partitionConsumer {
+       brokerURL, _ := url.Parse("pulsar://localhost:6650")
+       if rpc.lookupResult == nil {
+               rpc.lookupResult = &internal.LookupResult{
+                       LogicalAddr:  brokerURL,
+                       PhysicalAddr: brokerURL,
+               }
+       }
+       pool := &grabConnMockPool{cnx: cnx}
+
+       c := &client{
+               cnxPool:   pool,
+               rpcClient: rpc,
+               log:       log.DefaultNopLogger(),
+       }
+
+       pc := &partitionConsumer{
+               client:               c,
+               topic:                "persistent://public/default/test",
+               options:              &partitionConsumerOpts{subscription: 
"sub"},
+               log:                  log.DefaultNopLogger(),
+               compressionProviders: sync.Map{},
+               connectedCh:          make(chan struct{}, 1),
+               metrics:              newTestMetrics(),
+       }
+       // Required: lookupTopic calls _getConn().IsProxied() when 
assignedBrokerURL != "".
+       // grabConn will overwrite this with the same connection after a 
successful subscribe.
+       pc._setConn(cnx)

Review Comment:
   Added newGrabConnTestConsumerNoConn — a separate helper that skips the 
_setConn pre-initialization, simulating the real first-call path. 
TestGrabConn_MessageReceivedDuringSubscribe_NilConn uses it and confirmed the 
panic before the fix.
   
   We kept the original newGrabConnTestConsumer (with pre-set) for the other 
tests that don't specifically test the nil-conn path, since they focus on 
different concerns (handler ordering, cleanup on failure, etc.) and the pre-set 
avoids unrelated noise. The two helpers make intent explicit: "with prior 
connection" vs "fresh consumer, no connection."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to