Copilot commented on code in PR #1476:
URL: https://github.com/apache/pulsar-client-go/pull/1476#discussion_r3055106548
##########
pulsar/consumer_partition.go:
##########
@@ -2181,18 +2181,35 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}
- res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
- pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+ // Obtain the connection before sending the subscribe RPC so we can
register
+ // the consumer handler before the broker starts delivering frames.
+ // This closes a race where MESSAGE and ACTIVE_CONSUMER_CHANGE commands
+ // arriving immediately after the subscribe response were silently
dropped
+ // because AddConsumeHandler had not been called yet.
+ cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to get connection")
+ return err
+ }
+
+ // Register handler BEFORE the subscribe RPC so no frames are missed
+ err = cnx.AddConsumeHandler(pc.consumerID, pc)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to add consumer handler")
+ return err
+ }
+ res, err := pc.client.rpcClient.RequestOnCnx(cnx, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
if err != nil {
Review Comment:
The new grabConn tests only simulate ACTIVE_CONSUMER_CHANGE during the
subscribe RPC. Given the original race also drops MESSAGE frames, add a test
that injects a MESSAGE delivery during RequestOnCnx and asserts it reaches
MessageReceived without panicking (this would also catch the nil-conn hazard if
pc._setConn isn’t done before handler registration).
##########
pulsar/consumer_partition_test.go:
##########
@@ -347,3 +352,267 @@ func TestMessageReceivedAllMessagesDuplicate(t
*testing.T) {
default:
}
}
+
+// TestGrabConn_HandlerRegisteredBeforeSubscribe verifies that the consumer
+// handler is registered on the connection BEFORE the subscribe RPC is sent.
+//
+// Without this ordering, the broker can send MESSAGE and
ACTIVE_CONSUMER_CHANGE
+// frames immediately after the subscribe succeeds, but the client's read
+// goroutine cannot route them because the handler isn't in the map yet.
+// Those frames are silently dropped.
+func TestGrabConn_HandlerRegisteredBeforeSubscribe(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{cnx: cnx}
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.NoError(t, err)
+
+ assert.True(t, rpc.handlerRegisteredDuringRPC.Load(),
+ "AddConsumeHandler must be called before the subscribe RPC is
sent")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeFailure verifies that when the
+// subscribe RPC fails, the pre-registered consumer handler is removed from
+// the connection so it does not leak.
+func TestGrabConn_HandlerRemovedOnSubscribeFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ subscribeErr: fmt.Errorf("broker rejected subscribe"),
+ }
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.Error(t, err)
+
+ assert.True(t, cnx.handlerRemoved.Load(),
+ "DeleteConsumeHandler must be called when subscribe fails")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeTimeout verifies cleanup on timeout
+// and that the close command is sent on the same connection (not a potentially
+// different one from the pool).
+func TestGrabConn_HandlerRemovedOnSubscribeTimeout(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ subscribeErr: internal.ErrRequestTimeOut,
+ }
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.ErrorIs(t, err, internal.ErrRequestTimeOut)
+
+ assert.True(t, cnx.handlerRemoved.Load(),
+ "DeleteConsumeHandler must be called on timeout")
+ assert.True(t, rpc.closeSentOnCnx.Load(),
+ "CloseConsumer must be sent via RequestOnCnx on the same
connection")
+}
+
+// TestGrabConn_BrokerFrameDuringSubscribe simulates the exact race: the broker
+// sends a frame (e.g. ActiveConsumerChange) while the subscribe RPC is still
+// in flight. Because the handler is registered before the RPC, the frame
+// must be delivered to the consumer — not dropped.
+func TestGrabConn_BrokerFrameDuringSubscribe(t *testing.T) {
+ cnx := newSpyConnection()
+ var consumerReceivedChange atomic.Bool
+
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ duringSubscribe: func() {
+ // Simulate the broker's read goroutine delivering a
frame
+ // while the subscribe RPC is in flight.
+ if handler, ok :=
cnx.handler.Load().(*partitionConsumer); ok && handler != nil {
+ handler.ActiveConsumerChanged(true)
+ consumerReceivedChange.Store(true)
+ }
+ },
+ }
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.NoError(t, err)
+
+ assert.True(t, consumerReceivedChange.Load(),
+ "Frames sent by the broker during the subscribe RPC must reach
the consumer handler")
+}
+
+// TestGrabConn_GetConnectionFailure verifies that grabConn returns the error
+// from GetConnection without registering a handler or sending an RPC.
+func TestGrabConn_GetConnectionFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{cnx: cnx}
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ // Override the pool to return an error
+ pc.client.cnxPool = &grabConnMockPool{err: fmt.Errorf("connection
refused")}
+
+ err := pc.grabConn("")
+ assert.ErrorContains(t, err, "connection refused")
+
+ assert.False(t, cnx.handlerRegistered.Load(),
+ "AddConsumeHandler must not be called when GetConnection fails")
+}
+
+// TestGrabConn_AddConsumeHandlerFailure verifies that grabConn returns the
+// error from AddConsumeHandler without sending a subscribe RPC.
+func TestGrabConn_AddConsumeHandlerFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ cnx.addHandlerErr = fmt.Errorf("connection closed")
+ rpc := &grabConnSpyRPCClient{cnx: cnx}
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.ErrorContains(t, err, "connection closed")
+
+ assert.False(t, rpc.handlerRegisteredDuringRPC.Load(),
+ "Subscribe RPC must not be sent when AddConsumeHandler fails")
+}
+
+// --- Helpers
+
+// newGrabConnTestConsumer builds a minimal partitionConsumer wired to the
+// given spy connection and RPC client, suitable for testing grabConn.
+func newGrabConnTestConsumer(cnx *spyConnection, rpc *grabConnSpyRPCClient)
*partitionConsumer {
+ brokerURL, _ := url.Parse("pulsar://localhost:6650")
+ if rpc.lookupResult == nil {
+ rpc.lookupResult = &internal.LookupResult{
+ LogicalAddr: brokerURL,
+ PhysicalAddr: brokerURL,
+ }
+ }
+ pool := &grabConnMockPool{cnx: cnx}
+
+ c := &client{
+ cnxPool: pool,
+ rpcClient: rpc,
+ log: log.DefaultNopLogger(),
+ }
+
+ pc := &partitionConsumer{
+ client: c,
+ topic: "persistent://public/default/test",
+ options: &partitionConsumerOpts{subscription:
"sub"},
+ log: log.DefaultNopLogger(),
+ compressionProviders: sync.Map{},
+ connectedCh: make(chan struct{}, 1),
+ metrics: newTestMetrics(),
+ }
+ // Required: lookupTopic calls _getConn().IsProxied() when
assignedBrokerURL != "".
+ // grabConn will overwrite this with the same connection after a
successful subscribe.
+ pc._setConn(cnx)
Review Comment:
newGrabConnTestConsumer pre-initializes pc._setConn(cnx), which masks the
real-world initial-creation path where pc.conn is nil when grabConn starts.
Consider avoiding this pre-set (or making it conditional) so the tests can
catch panics/behavior that depend on grabConn establishing the connection
before handler callbacks (especially for MESSAGE delivery during subscribe).
```suggestion
// Preserve the real initial-creation path by default: grabConn should
be
// responsible for establishing pc.conn before subscribe/handler
callbacks.
// Only seed a connection for lookup paths that need
_getConn().IsProxied()
// when an assigned broker URL is already present.
if rpc.lookupResult != nil && rpc.lookupResult.AssignedBrokerURL != "" {
pc._setConn(cnx)
}
```
##########
pulsar/consumer_partition.go:
##########
@@ -2181,18 +2181,35 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}
- res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
- pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+ // Obtain the connection before sending the subscribe RPC so we can
register
+ // the consumer handler before the broker starts delivering frames.
+ // This closes a race where MESSAGE and ACTIVE_CONSUMER_CHANGE commands
+ // arriving immediately after the subscribe response were silently
dropped
+ // because AddConsumeHandler had not been called yet.
+ cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to get connection")
+ return err
+ }
+
+ // Register handler BEFORE the subscribe RPC so no frames are missed
+ err = cnx.AddConsumeHandler(pc.consumerID, pc)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to add consumer handler")
+ return err
+ }
Review Comment:
Registering the consume handler before calling pc._setConn(cnx) can panic if
the broker delivers a MESSAGE/ACK-related frame immediately (or mid-RPC):
MessageReceived constructs message objects using pc._getConn(), which panics if
pc.conn is still nil (fresh consumer creation calls grabConn before any
_setConn). Set the connection pointer before AddConsumeHandler (and if
subscribe fails, restore the previous conn if there was one) so handler
callbacks can safely call pc._getConn().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]