Copilot commented on code in PR #1487:
URL: https://github.com/apache/pulsar-client-go/pull/1487#discussion_r3165407884
##########
pulsar/consumer_impl.go:
##########
@@ -84,6 +84,10 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
return nil, newError(SubscriptionNotFound, "subscription name
is required for consumer")
}
+ if options.PriorityLevel < 0 {
+ return nil, newError(InvalidConfiguration, "priority level must
be >= 0")
+ }
Review Comment:
`PriorityLevel` is validated only for negativity, but it is later cast to
`int32` for the subscribe command. On 64-bit platforms a large positive
`PriorityLevel` can overflow when converted to `int32`, potentially sending a
negative value to the broker. Please also validate that `PriorityLevel` fits
into the protocol field range (<= max int32) and return `InvalidConfiguration`
if it does not.
##########
pulsar/consumer_test.go:
##########
@@ -228,6 +228,223 @@ func TestConsumerWithInvalidConf(t *testing.T) {
assert.Equal(t, err.(*Error).Result(), TopicNotFound)
}
+func TestConsumerWithInvalidPriorityLevel(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-topic",
+ SubscriptionName: "my-sub",
+ PriorityLevel: -1,
+ })
+
+ assert.Nil(t, consumer)
+ assert.NotNil(t, err)
+ assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
+}
+
+func TestPriorityConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ sub := "sub-shared-priority"
+
+ // High-priority consumers (priority 1)
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // Low-priority consumer (priority 2)
+ consumer3, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 2,
+ })
+ assert.Nil(t, err)
+ defer consumer3.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ // Drain permits from consumer1 and consumer2
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ consumer1.Receive(ctx)
+ cancel()
+ }
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ consumer2.Receive(ctx)
+ cancel()
Review Comment:
The receive loops intended to drain permits ignore both the returned message
and error from `Receive`. If `Receive` times out or fails, the test can still
pass without actually verifying that high-priority consumers received messages
(and therefore without validating priority dispatch). Capture the results and
assert `err == nil` and `msg != nil` for each receive.
##########
pulsar/consumer_test.go:
##########
@@ -228,6 +228,223 @@ func TestConsumerWithInvalidConf(t *testing.T) {
assert.Equal(t, err.(*Error).Result(), TopicNotFound)
}
+func TestConsumerWithInvalidPriorityLevel(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-topic",
+ SubscriptionName: "my-sub",
+ PriorityLevel: -1,
+ })
+
+ assert.Nil(t, consumer)
+ assert.NotNil(t, err)
+ assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
+}
+
+func TestPriorityConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ sub := "sub-shared-priority"
+
+ // High-priority consumers (priority 1)
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // Low-priority consumer (priority 2)
+ consumer3, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 2,
+ })
+ assert.Nil(t, err)
+ defer consumer3.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ // Drain permits from consumer1 and consumer2
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ consumer1.Receive(ctx)
+ cancel()
+ }
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ consumer2.Receive(ctx)
+ cancel()
+ }
+
+ // Low-priority consumer should not have received any messages
+ ctx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ msg, err := consumer3.Receive(ctx)
+ cancel()
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+
+ consumer1.Close()
+ consumer2.Close()
+ consumer3.Close()
Review Comment:
These consumers are already closed via `defer ...Close()` above, and then
closed again here. This redundancy makes the test harder to follow and can hide
close-related issues; prefer relying on the defers (or remove the defers and
keep explicit closes) but not both.
##########
pulsar/consumer_test.go:
##########
@@ -228,6 +228,223 @@ func TestConsumerWithInvalidConf(t *testing.T) {
assert.Equal(t, err.(*Error).Result(), TopicNotFound)
}
+func TestConsumerWithInvalidPriorityLevel(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-topic",
+ SubscriptionName: "my-sub",
+ PriorityLevel: -1,
+ })
+
+ assert.Nil(t, consumer)
+ assert.NotNil(t, err)
+ assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
+}
+
+func TestPriorityConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ sub := "sub-shared-priority"
+
+ // High-priority consumers (priority 1)
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // Low-priority consumer (priority 2)
+ consumer3, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 2,
+ })
+ assert.Nil(t, err)
+ defer consumer3.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ // Drain permits from consumer1 and consumer2
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ consumer1.Receive(ctx)
+ cancel()
+ }
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ consumer2.Receive(ctx)
+ cancel()
+ }
+
+ // Low-priority consumer should not have received any messages
+ ctx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ msg, err := consumer3.Receive(ctx)
+ cancel()
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+
+ consumer1.Close()
+ consumer2.Close()
+ consumer3.Close()
+}
+
+func TestFailOverConsumerPriority(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/" +
randomName + "/partitions"
+ makeHTTPCall(t, http.MethodPut, testURL, "9")
+
+ sub := "my-sub"
+
+ // C1 at priority 1
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "aaa",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ // C2 at priority 0 — should take over from C1
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb1",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 0,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // C3 at priority 0
+ consumer3, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb2",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 0,
+ })
+ assert.Nil(t, err)
+ defer consumer3.Close()
+
+ // C4 at priority 0
+ consumer4, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb3",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 0,
+ })
+ assert.Nil(t, err)
+ defer consumer4.Close()
+
+ // C5 at priority 1 — should not get any partitions
+ consumer5, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb4",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer5.Close()
+
+ evenDistribution := 9 / 3 // 3 partitions per priority-0 consumer
+
+ topicName, err := utils.GetTopicName(topic)
+ assert.Nil(t, err)
+
+ cfg := &config.Config{}
+ pulsarAdmin, err := pulsaradmin.NewClient(cfg)
+ assert.NoError(t, err)
+
+ // Poll admin stats until partitions are evenly distributed among
priority-0 consumers
+ retryAssert(t, 20, 500, func() {}, func(at assert.TestingT) bool {
+ stats, err :=
pulsarAdmin.Topics().GetPartitionedStats(*topicName, true)
+ if err != nil {
+ return false
+ }
+ counts := map[string]int{}
+ for _, pStats := range stats.Partitions {
+ subStats, ok := pStats.Subscriptions[sub]
+ if !ok {
+ return false
+ }
+ counts[subStats.ActiveConsumerName]++
+ }
+ return len(counts) == 3 &&
+ counts["bbb1"] == evenDistribution &&
+ counts["bbb2"] == evenDistribution &&
+ counts["bbb3"] == evenDistribution
+ })
+
+ // Final assertion with real test failure
+ stats, err := pulsarAdmin.Topics().GetPartitionedStats(*topicName, true)
+ assert.Nil(t, err)
+ counts := map[string]int{}
+ for _, pStats := range stats.Partitions {
+ subStats := pStats.Subscriptions[sub]
+ counts[subStats.ActiveConsumerName]++
+ }
+ assert.Equal(t, 3, len(counts))
+ assert.Equal(t, evenDistribution, counts["bbb1"])
+ assert.Equal(t, evenDistribution, counts["bbb2"])
+ assert.Equal(t, evenDistribution, counts["bbb3"])
+
+ consumer1.Close()
+ consumer2.Close()
+ consumer3.Close()
+ consumer4.Close()
+ consumer5.Close()
Review Comment:
These explicit `Close()` calls duplicate the `defer consumerX.Close()`
statements above. Please keep one close strategy to avoid redundant calls and
make test cleanup clearer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]