This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a96e5900 refactor(lite_push_consumer): enable Lite Push Consumer to 
suspend consumption (#1203)
a96e5900 is described below

commit a96e59003f6c93be9d301f5a4eb36e2ab4caed25
Author: Quan <[email protected]>
AuthorDate: Fri Mar 20 15:48:27 2026 +0800

    refactor(lite_push_consumer): enable Lite Push Consumer to suspend 
consumption (#1203)
    
    Change-Id: Ia190bd12af80ded00a20cfe83712918d0f162dff
---
 golang/consumer_service.go            | 144 +++++++++++--
 golang/lite_push_consumer.go          |  23 ++-
 golang/process_queue.go               |  45 ++++-
 golang/protocol/v2/admin.pb.go        |   9 +-
 golang/protocol/v2/admin_grpc.pb.go   |   9 +-
 golang/protocol/v2/definition.pb.go   | 368 ++++++++++++++++++++++++++--------
 golang/protocol/v2/service.pb.go      | 143 ++++++++-----
 golang/protocol/v2/service_grpc.pb.go |  41 ++--
 golang/push_consumer.go               |  70 ++++---
 golang/push_consumer_options.go       |  71 ++++---
 golang/push_consumer_test.go          |  64 ++++++
 11 files changed, 723 insertions(+), 264 deletions(-)

diff --git a/golang/consumer_service.go b/golang/consumer_service.go
index 316749fb..7aad7465 100644
--- a/golang/consumer_service.go
+++ b/golang/consumer_service.go
@@ -22,6 +22,20 @@ import (
        "time"
 )
 
+var (
+       messageGroupExtractor = func(mv *MessageView) string {
+               messageGroup := mv.GetMessageGroup()
+               if messageGroup != nil {
+                       return *messageGroup
+               }
+               return ""
+       }
+
+       liteTopicExtractor = func(mv *MessageView) string {
+               return mv.GetLiteTopic()
+       }
+)
+
 type ConsumeService interface {
        consume(ProcessQueue, []*MessageView)
        consumeWithDuration(*MessageView, time.Duration, func(ConsumerResult, 
error))
@@ -93,7 +107,8 @@ func (bcs *baseConsumeService) newConsumeTask(clientId 
string, messageListener M
                }()
                duration := time.Since(startTime)
                status := MessageHookPointsStatus_ERROR
-               if consumeResult == SUCCESS {
+               // Check if result is SUCCESS or SUSPEND (considered as 
successful)
+               if consumeResult.Type == ConsumerResultTypeSuccess || 
consumeResult.Type == ConsumerResultTypeSuspend {
                        status = MessageHookPointsStatus_OK
                }
                messageInterceptor.doAfter(MessageHookPoints_CONSUME, 
[]*MessageCommon{messageView.GetMessageCommon()}, duration, status)
@@ -101,6 +116,7 @@ func (bcs *baseConsumeService) newConsumeTask(clientId 
string, messageListener M
 }
 
 var _ = ConsumeService(&standardConsumeService{})
+var _ = ConsumeService(&liteFifoConsumeService{})
 
 type standardConsumeService struct {
        baseConsumeService
@@ -133,37 +149,50 @@ func NewStandardConsumeService(clientId string, 
messageListener MessageListener,
        }
 }
 
-func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews 
[]*MessageView) {
-       if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
-               fcs.consumeIteratively(pq, &messageViews, 0)
-               return
-       }
-       // Group messages by messageGroup
-       messageViewsGroupByMessageGroup := make(map[string][]*MessageView)
-       messageViewsWithoutMessageGroup := make([]*MessageView, 0)
+// groupMessageBy groups messages by applying the provided groupKeyExtractor 
function
+// It returns two maps: grouped messages and messages without a group key
+func groupMessageBy(messageViews []*MessageView, groupKeyExtractor 
func(*MessageView) string) (map[string][]*MessageView, []*MessageView) {
+       messageViewsGroupByGroupKey := make(map[string][]*MessageView)
+       messageViewsWithoutGroupKey := make([]*MessageView, 0)
+
        for _, messageView := range messageViews {
-               messageGroup := messageView.GetMessageGroup()
-               if messageGroup != nil && *messageGroup != "" {
-                       messageViewsGroupByMessageGroup[*messageGroup] = 
append(messageViewsGroupByMessageGroup[*messageGroup], messageView)
+               groupKey := groupKeyExtractor(messageView)
+               if groupKey != "" {
+                       messageViewsGroupByGroupKey[groupKey] = 
append(messageViewsGroupByGroupKey[groupKey], messageView)
                } else {
-                       messageViewsWithoutMessageGroup = 
append(messageViewsWithoutMessageGroup, messageView)
+                       messageViewsWithoutGroupKey = 
append(messageViewsWithoutGroupKey, messageView)
                }
        }
 
-       groupNum := len(messageViewsGroupByMessageGroup)
-       if len(messageViewsWithoutMessageGroup) > 0 {
+       groupNum := len(messageViewsGroupByGroupKey)
+       if len(messageViewsWithoutGroupKey) > 0 {
                groupNum++
        }
        sugarBaseLogger.Debugf("FifoConsumeService parallel consume, 
messageViewsNum=%d, groupNum=%d", len(messageViews), groupNum)
 
+       return messageViewsGroupByGroupKey, messageViewsWithoutGroupKey
+}
+
+func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews 
[]*MessageView) {
+       if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
+               fcs.consumeIteratively(pq, &messageViews, 0)
+               return
+       }
+
+       messageViewsGroupByGroupKey, messageViewsWithoutGroupKey := 
groupMessageBy(
+               messageViews,
+               messageGroupExtractor,
+       )
+
        // Consume messages in parallel by group
-       for _, group := range messageViewsGroupByMessageGroup {
+       for _, group := range messageViewsGroupByGroupKey {
                fcs.consumeIteratively(pq, &group, 0)
        }
-       if len(messageViewsWithoutMessageGroup) > 0 {
-               fcs.consumeIteratively(pq, &messageViewsWithoutMessageGroup, 0)
+       if len(messageViewsWithoutGroupKey) > 0 {
+               fcs.consumeIteratively(pq, &messageViewsWithoutGroupKey, 0)
        }
 }
+
 func (fcs *fifoConsumeService) consumeIteratively(pq ProcessQueue, 
messageViewsPtr *[]*MessageView, ptr int) {
        if messageViewsPtr == nil {
                sugarBaseLogger.Errorf("[Bug] messageViews is nil when 
consumeIteratively")
@@ -192,7 +221,82 @@ func (fcs *fifoConsumeService) consumeIteratively(pq 
ProcessQueue, messageViewsP
 
 func NewFiFoConsumeService(clientId string, messageListener MessageListener, 
consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor, 
enableFifoConsumeAccelerator bool) *fifoConsumeService {
        return &fifoConsumeService{
-               baseConsumeService:            *NewBaseConsumeService(clientId, 
messageListener, consumptionExecutor, messageInterceptor),
-               enableFifoConsumeAccelerator:  enableFifoConsumeAccelerator,
+               baseConsumeService:           *NewBaseConsumeService(clientId, 
messageListener, consumptionExecutor, messageInterceptor),
+               enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
+       }
+}
+
+// liteFifoConsumeService is a fifoConsumeService that used for lite push 
consumer
+type liteFifoConsumeService struct {
+       baseConsumeService
+       enableFifoConsumeAccelerator bool
+}
+
+func (lcs *liteFifoConsumeService) consume(pq ProcessQueue, messageViews 
[]*MessageView) {
+       if !lcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
+               lcs.consumeIteratively(pq, &messageViews, 0)
+               return
+       }
+
+       messageViewsGroupByGroupKey, messageViewsWithoutGroupKey := 
groupMessageBy(
+               messageViews,
+               liteTopicExtractor,
+       )
+
+       for _, group := range messageViewsGroupByGroupKey {
+               lcs.consumeIteratively(pq, &group, 0)
+       }
+       if len(messageViewsWithoutGroupKey) > 0 {
+               lcs.consumeIteratively(pq, &messageViewsWithoutGroupKey, 0)
+       }
+}
+
+func (lcs *liteFifoConsumeService) consumeIteratively(pq ProcessQueue, 
messageViewsPtr *[]*MessageView, ptr int) {
+       if messageViewsPtr == nil {
+               sugarBaseLogger.Errorf("[Bug] messageViews is nil when 
consumeIteratively")
+               return
+       }
+       messageViews := *messageViewsPtr
+       if ptr >= len(messageViews) {
+               return
+       }
+       mv := messageViews[ptr]
+       if mv.isCorrupted() {
+               sugarBaseLogger.Errorf("Message is corrupted for FIFO 
consumption, prepare to discard it, mq=%s, messageId=%s, clientId=%s", 
pq.getMessageQueue().String(), mv.GetMessageId(), lcs.clientId)
+               pq.discardFifoMessage(mv)
+               lcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
+               return
+       }
+       lcs.consumeImmediately(mv, func(result ConsumerResult, err error) {
+               if err != nil {
+                       sugarBaseLogger.Errorf("[Bug] Exception raised in 
consumption callback, clientId=%s", lcs.clientId)
+                       return
+               }
+               pq.eraseFifoMessage(mv, result)
+               if result.Type == ConsumerResultTypeSuspend {
+                       // Suspend all messages with the same liteTopic in this 
batch
+                       newMsgList := make([]*MessageView, 0)
+                       for i := ptr + 1; i < len(messageViews); i++ {
+                               msgView := messageViews[i]
+                               if msgView.GetLiteTopic() == mv.GetLiteTopic() {
+                                       pq.eraseFifoMessage(msgView, result)
+                               } else {
+                                       newMsgList = append(newMsgList, msgView)
+                               }
+                       }
+                       // Continue processing remaining messages with 
different liteTopic
+                       if len(newMsgList) > 0 {
+                               lcs.consumeIteratively(pq, &newMsgList, 0)
+                       }
+               } else {
+                       lcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
+               }
+       })
+}
+
+func NewLiteFifoConsumeService(clientId string, messageListener 
MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor 
MessageInterceptor, enableFifoConsumeAccelerator bool) *liteFifoConsumeService {
+       return &liteFifoConsumeService{
+               baseConsumeService:           *NewBaseConsumeService(clientId, 
messageListener, consumptionExecutor, messageInterceptor),
+               enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
        }
 }
diff --git a/golang/lite_push_consumer.go b/golang/lite_push_consumer.go
index 3fa620fe..e78cd1b4 100644
--- a/golang/lite_push_consumer.go
+++ b/golang/lite_push_consumer.go
@@ -107,27 +107,27 @@ func (lpc *defaultLitePushConsumer) 
notifyUnsubscribeLite(command *v2.NotifyUnsu
        lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
 }
 
-func (lpc *defaultLitePushConsumer) SubscribeLite(topic string) error {
+func (lpc *defaultLitePushConsumer) SubscribeLite(liteTopic string) error {
        if err := lpc.checkRunning(); err != nil {
                return err
        }
-       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{topic}); err != nil {
-               sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite topic:%s 
err:%v", topic, err)
+       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{liteTopic}); err != nil {
+               sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite 
liteTopic:%s err:%v", liteTopic, err)
                return err
        }
-       lpc.litePushConsumerSettings.liteTopicSet.Store(topic, struct{}{})
+       lpc.litePushConsumerSettings.liteTopicSet.Store(liteTopic, struct{}{})
        return nil
 }
 
-func (lpc *defaultLitePushConsumer) UnSubscribeLite(topic string) error {
+func (lpc *defaultLitePushConsumer) UnSubscribeLite(liteTopic string) error {
        if err := lpc.checkRunning(); err != nil {
                return err
        }
-       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{topic}); err != nil {
-               sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite 
topic:%s err:%v", topic, err)
+       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{liteTopic}); err != nil {
+               sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite 
liteTopic:%s err:%v", liteTopic, err)
                return err
        }
-       lpc.litePushConsumerSettings.liteTopicSet.Delete(topic)
+       lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
        return nil
 }
 
@@ -147,9 +147,10 @@ func (lpc *defaultLitePushConsumer) 
syncAllLiteSubscription() {
                }
                return true
        })
-       if len(liteTopicSet) == 0 {
-               return
-       }
+       // Sync subscription even when liteTopicSet is empty to keep server 
state consistent
+       //if len(liteTopicSet) == 0 {
+       //      return
+       //}
        if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_COMPLETE_ADD, liteTopicSet); err != nil {
                sugarBaseLogger.Errorf("LitePushConsumer 
syncAllLiteSubscription:%v,  err:%v", liteTopicSet, err)
        }
diff --git a/golang/process_queue.go b/golang/process_queue.go
index 274af8e3..63e709b2 100644
--- a/golang/process_queue.go
+++ b/golang/process_queue.go
@@ -71,6 +71,8 @@ func (dpq *defaultProcessQueue) discardFifoMessage(mv 
*MessageView) {
 }
 
 func (dpq *defaultProcessQueue) eraseFifoMessage(mv *MessageView, result 
ConsumerResult) {
+       result = dpq.convertSuspendResultIfNeeded(result)
+
        retryPolicy := dpq.consumer.pcSettings.GetRetryPolicy()
        maxAttempts := retryPolicy.MaxAttempts
        attempt := mv.GetMessageCommon().deliveryAttempt
@@ -78,7 +80,8 @@ func (dpq *defaultProcessQueue) eraseFifoMessage(mv 
*MessageView, result Consume
        service := dpq.consumer.consumerService
        clientId := dpq.consumer.cli.clientID
 
-       if result == FAILURE && attempt < maxAttempts {
+       // Handle FAILURE result with retry
+       if result.Type == ConsumerResultTypeFailure && attempt < maxAttempts {
                nextAttemptDelay := utils.GetNextAttemptDelay(retryPolicy, 
int(attempt))
                mv.deliveryAttempt += 1
                attempt = mv.deliveryAttempt
@@ -91,16 +94,36 @@ func (dpq *defaultProcessQueue) eraseFifoMessage(mv 
*MessageView, result Consume
                return
        }
 
-       if result != SUCCESS {
-               dpq.consumer.cli.log.Infof("Failed to consume fifo message 
finally, run out of attempt times, maxAttempts=%d, "+
-                       "attempt=%d, mq=%s, messageId=%s, clientId=%s", 
maxAttempts, attempt, dpq.mqstr, messageId, clientId)
-       }
-       // Ack message or forward it to DLQ depends on consumption result.
-       if result == SUCCESS {
+       // Handle SUCCESS result
+       if result.Type == ConsumerResultTypeSuccess {
                dpq.ackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
-       } else {
-               dpq.forwardToDeadLetterQueue(mv, func(error) { 
dpq.evictCacheMessage(mv) })
+               return
+       }
+
+       // Handle SUSPEND result
+       if result.Type == ConsumerResultTypeSuspend {
+               dpq.consumer.cli.log.Infof("Suspend consumption, 
consumerGroup=%s, topic=%s, liteTopic=%s, messageId=%s, suspendTime=%v",
+                       dpq.consumer.groupName, mv.topic, mv.GetLiteTopic(), 
messageId, result.suspendTime)
+               dpq.changeInvisibleDuration(mv, result.suspendTime, 1, 
func(error) { dpq.evictCacheMessage(mv) })
+               return
        }
+
+       // Handle FAILURE result without retry (final failure)
+       dpq.consumer.cli.log.Infof("Failed to consume fifo message finally, run 
out of attempt times, maxAttempts=%d, "+
+               "attempt=%d, mq=%s, messageId=%s, clientId=%s", maxAttempts, 
attempt, dpq.mqstr, messageId, clientId)
+       dpq.forwardToDeadLetterQueue(mv, func(error) { 
dpq.evictCacheMessage(mv) })
+}
+
+func (dpq *defaultProcessQueue) convertSuspendResultIfNeeded(result 
ConsumerResult) ConsumerResult {
+       if result.Type == ConsumerResultTypeSuspend {
+               if dpq.consumer.pcSettings.clientType != 
v2.ClientType_LITE_PUSH_CONSUMER {
+                       dpq.consumer.cli.log.Warnf("Only LitePushConsumer 
supports ConsumeResultSuspend! "+
+                               "Convert to FAILURE, consumerGroup=%s, 
consumerType=%v",
+                               dpq.consumer.groupName, 
dpq.consumer.pcSettings.clientType)
+                       return FAILURE
+               }
+       }
+       return result
 }
 
 func (dpq *defaultProcessQueue) forwardToDeadLetterQueue(mv *MessageView, 
callback func(error)) {
@@ -162,14 +185,14 @@ func (dpq *defaultProcessQueue) 
forwardToDeadLetterQueueLater(mv *MessageView, a
 }
 
 func (dpq *defaultProcessQueue) eraseMessage(mv *MessageView, consumeResult 
ConsumerResult) {
-       if consumeResult == SUCCESS {
+       consumeResult = dpq.convertSuspendResultIfNeeded(consumeResult)
+       if consumeResult.Type == ConsumerResultTypeSuccess || 
consumeResult.Type == ConsumerResultTypeSuspend {
                dpq.consumer.consumptionOkQuantity.Inc()
                dpq.ackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
        } else {
                dpq.consumer.consumptionErrorQuantity.Inc()
                dpq.nackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
        }
-
 }
 
 func (dpq *defaultProcessQueue) discardMessage(mv *MessageView) {
diff --git a/golang/protocol/v2/admin.pb.go b/golang/protocol/v2/admin.pb.go
index ac8d6d98..0a2c2610 100644
--- a/golang/protocol/v2/admin.pb.go
+++ b/golang/protocol/v2/admin.pb.go
@@ -15,18 +15,19 @@
 
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-//     protoc-gen-go v1.36.8
-//     protoc        v5.29.3
+//     protoc-gen-go v1.36.11
+//     protoc        v7.34.0
 // source: apache/rocketmq/v2/admin.proto
 
 package v2
 
 import (
-       protoreflect "google.golang.org/protobuf/reflect/protoreflect"
-       protoimpl "google.golang.org/protobuf/runtime/protoimpl"
        reflect "reflect"
        sync "sync"
        unsafe "unsafe"
+
+       protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+       protoimpl "google.golang.org/protobuf/runtime/protoimpl"
 )
 
 const (
diff --git a/golang/protocol/v2/admin_grpc.pb.go 
b/golang/protocol/v2/admin_grpc.pb.go
index 0ea7cc2f..4d2ba3a5 100644
--- a/golang/protocol/v2/admin_grpc.pb.go
+++ b/golang/protocol/v2/admin_grpc.pb.go
@@ -15,14 +15,15 @@
 
 // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
 // versions:
-// - protoc-gen-go-grpc v1.5.1
-// - protoc             v5.29.3
+// - protoc-gen-go-grpc v1.6.1
+// - protoc             v7.34.0
 // source: apache/rocketmq/v2/admin.proto
 
 package v2
 
 import (
        context "context"
+
        grpc "google.golang.org/grpc"
        codes "google.golang.org/grpc/codes"
        status "google.golang.org/grpc/status"
@@ -78,7 +79,7 @@ type AdminServer interface {
 type UnimplementedAdminServer struct{}
 
 func (UnimplementedAdminServer) ChangeLogLevel(context.Context, 
*ChangeLogLevelRequest) (*ChangeLogLevelResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method ChangeLogLevel 
not implemented")
+       return nil, status.Error(codes.Unimplemented, "method ChangeLogLevel 
not implemented")
 }
 func (UnimplementedAdminServer) mustEmbedUnimplementedAdminServer() {}
 func (UnimplementedAdminServer) testEmbeddedByValue()               {}
@@ -91,7 +92,7 @@ type UnsafeAdminServer interface {
 }
 
 func RegisterAdminServer(s grpc.ServiceRegistrar, srv AdminServer) {
-       // If the following call pancis, it indicates UnimplementedAdminServer 
was
+       // If the following call panics, it indicates UnimplementedAdminServer 
was
        // embedded by pointer and is nil.  This will cause panics if an
        // unimplemented method is ever invoked, so we test this at 
initialization
        // time to prevent it from happening at runtime later due to I/O.
diff --git a/golang/protocol/v2/definition.pb.go 
b/golang/protocol/v2/definition.pb.go
index 37977be8..4460b6cf 100644
--- a/golang/protocol/v2/definition.pb.go
+++ b/golang/protocol/v2/definition.pb.go
@@ -15,20 +15,21 @@
 
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-//     protoc-gen-go v1.36.8
-//     protoc        v5.29.3
+//     protoc-gen-go v1.36.11
+//     protoc        v7.34.0
 // source: apache/rocketmq/v2/definition.proto
 
 package v2
 
 import (
+       reflect "reflect"
+       sync "sync"
+       unsafe "unsafe"
+
        protoreflect "google.golang.org/protobuf/reflect/protoreflect"
        protoimpl "google.golang.org/protobuf/runtime/protoimpl"
        durationpb "google.golang.org/protobuf/types/known/durationpb"
        timestamppb "google.golang.org/protobuf/types/known/timestamppb"
-       reflect "reflect"
-       sync "sync"
-       unsafe "unsafe"
 )
 
 const (
@@ -306,6 +307,8 @@ const (
        MessageType_TRANSACTION MessageType = 4
        // lite topic
        MessageType_LITE MessageType = 5
+       // Messages that lower prioritised ones may need to wait for higher 
priority messages to be processed first
+       MessageType_PRIORITY MessageType = 6
 )
 
 // Enum value maps for MessageType.
@@ -317,6 +320,7 @@ var (
                3: "DELAY",
                4: "TRANSACTION",
                5: "LITE",
+               6: "PRIORITY",
        }
        MessageType_value = map[string]int32{
                "MESSAGE_TYPE_UNSPECIFIED": 0,
@@ -325,6 +329,7 @@ var (
                "DELAY":                    3,
                "TRANSACTION":              4,
                "LITE":                     5,
+               "PRIORITY":                 6,
        }
 )
 
@@ -873,13 +878,9 @@ func (Language) EnumDescriptor() ([]byte, []int) {
 type LiteSubscriptionAction int32
 
 const (
-       // incremental add
-       LiteSubscriptionAction_PARTIAL_ADD LiteSubscriptionAction = 0
-       // incremental remove
-       LiteSubscriptionAction_PARTIAL_REMOVE LiteSubscriptionAction = 1
-       // all add
-       LiteSubscriptionAction_COMPLETE_ADD LiteSubscriptionAction = 2
-       // add remove
+       LiteSubscriptionAction_PARTIAL_ADD     LiteSubscriptionAction = 0
+       LiteSubscriptionAction_PARTIAL_REMOVE  LiteSubscriptionAction = 1
+       LiteSubscriptionAction_COMPLETE_ADD    LiteSubscriptionAction = 2
        LiteSubscriptionAction_COMPLETE_REMOVE LiteSubscriptionAction = 3
 )
 
@@ -978,6 +979,55 @@ func (QueryOffsetPolicy) EnumDescriptor() ([]byte, []int) {
        return file_apache_rocketmq_v2_definition_proto_rawDescGZIP(), []int{12}
 }
 
+type OffsetOption_Policy int32
+
+const (
+       OffsetOption_LAST OffsetOption_Policy = 0
+       OffsetOption_MIN  OffsetOption_Policy = 1
+       OffsetOption_MAX  OffsetOption_Policy = 2
+)
+
+// Enum value maps for OffsetOption_Policy.
+var (
+       OffsetOption_Policy_name = map[int32]string{
+               0: "LAST",
+               1: "MIN",
+               2: "MAX",
+       }
+       OffsetOption_Policy_value = map[string]int32{
+               "LAST": 0,
+               "MIN":  1,
+               "MAX":  2,
+       }
+)
+
+func (x OffsetOption_Policy) Enum() *OffsetOption_Policy {
+       p := new(OffsetOption_Policy)
+       *p = x
+       return p
+}
+
+func (x OffsetOption_Policy) String() string {
+       return protoimpl.X.EnumStringOf(x.Descriptor(), 
protoreflect.EnumNumber(x))
+}
+
+func (OffsetOption_Policy) Descriptor() protoreflect.EnumDescriptor {
+       return 
file_apache_rocketmq_v2_definition_proto_enumTypes[13].Descriptor()
+}
+
+func (OffsetOption_Policy) Type() protoreflect.EnumType {
+       return &file_apache_rocketmq_v2_definition_proto_enumTypes[13]
+}
+
+func (x OffsetOption_Policy) Number() protoreflect.EnumNumber {
+       return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use OffsetOption_Policy.Descriptor instead.
+func (OffsetOption_Policy) EnumDescriptor() ([]byte, []int) {
+       return file_apache_rocketmq_v2_definition_proto_rawDescGZIP(), 
[]int{21, 0}
+}
+
 type FilterExpression struct {
        state         protoimpl.MessageState `protogen:"open.v1"`
        Type          FilterType             
`protobuf:"varint,1,opt,name=type,proto3,enum=apache.rocketmq.v2.FilterType" 
json:"type,omitempty"`
@@ -1700,7 +1750,9 @@ type SystemProperties struct {
        // Information to identify whether this message is from dead letter 
queue.
        DeadLetterQueue *DeadLetterQueue 
`protobuf:"bytes,20,opt,name=dead_letter_queue,json=deadLetterQueue,proto3,oneof"
 json:"dead_letter_queue,omitempty"`
        // lite topic
-       LiteTopic     *string 
`protobuf:"bytes,21,opt,name=lite_topic,json=liteTopic,proto3,oneof" 
json:"lite_topic,omitempty"`
+       LiteTopic *string 
`protobuf:"bytes,21,opt,name=lite_topic,json=liteTopic,proto3,oneof" 
json:"lite_topic,omitempty"`
+       // Priority of message, which is optional
+       Priority      *int32 
`protobuf:"varint,22,opt,name=priority,proto3,oneof" json:"priority,omitempty"`
        unknownFields protoimpl.UnknownFields
        sizeCache     protoimpl.SizeCache
 }
@@ -1882,6 +1934,13 @@ func (x *SystemProperties) GetLiteTopic() string {
        return ""
 }
 
+func (x *SystemProperties) GetPriority() int32 {
+       if x != nil && x.Priority != nil {
+               return *x.Priority
+       }
+       return 0
+}
+
 type DeadLetterQueue struct {
        state protoimpl.MessageState `protogen:"open.v1"`
        // Original topic for this DLQ message.
@@ -2556,6 +2615,120 @@ func (x *Metric) GetEndpoints() *Endpoints {
        return nil
 }
 
+type OffsetOption struct {
+       state protoimpl.MessageState `protogen:"open.v1"`
+       // Types that are valid to be assigned to OffsetType:
+       //
+       //      *OffsetOption_Policy_
+       //      *OffsetOption_Offset
+       //      *OffsetOption_TailN
+       //      *OffsetOption_Timestamp
+       OffsetType    isOffsetOption_OffsetType `protobuf_oneof:"offset_type"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *OffsetOption) Reset() {
+       *x = OffsetOption{}
+       mi := &file_apache_rocketmq_v2_definition_proto_msgTypes[21]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *OffsetOption) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*OffsetOption) ProtoMessage() {}
+
+func (x *OffsetOption) ProtoReflect() protoreflect.Message {
+       mi := &file_apache_rocketmq_v2_definition_proto_msgTypes[21]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use OffsetOption.ProtoReflect.Descriptor instead.
+func (*OffsetOption) Descriptor() ([]byte, []int) {
+       return file_apache_rocketmq_v2_definition_proto_rawDescGZIP(), []int{21}
+}
+
+func (x *OffsetOption) GetOffsetType() isOffsetOption_OffsetType {
+       if x != nil {
+               return x.OffsetType
+       }
+       return nil
+}
+
+func (x *OffsetOption) GetPolicy() OffsetOption_Policy {
+       if x != nil {
+               if x, ok := x.OffsetType.(*OffsetOption_Policy_); ok {
+                       return x.Policy
+               }
+       }
+       return OffsetOption_LAST
+}
+
+func (x *OffsetOption) GetOffset() int64 {
+       if x != nil {
+               if x, ok := x.OffsetType.(*OffsetOption_Offset); ok {
+                       return x.Offset
+               }
+       }
+       return 0
+}
+
+func (x *OffsetOption) GetTailN() int64 {
+       if x != nil {
+               if x, ok := x.OffsetType.(*OffsetOption_TailN); ok {
+                       return x.TailN
+               }
+       }
+       return 0
+}
+
+func (x *OffsetOption) GetTimestamp() int64 {
+       if x != nil {
+               if x, ok := x.OffsetType.(*OffsetOption_Timestamp); ok {
+                       return x.Timestamp
+               }
+       }
+       return 0
+}
+
+type isOffsetOption_OffsetType interface {
+       isOffsetOption_OffsetType()
+}
+
+type OffsetOption_Policy_ struct {
+       Policy OffsetOption_Policy 
`protobuf:"varint,1,opt,name=policy,proto3,enum=apache.rocketmq.v2.OffsetOption_Policy,oneof"`
+}
+
+type OffsetOption_Offset struct {
+       Offset int64 `protobuf:"varint,2,opt,name=offset,proto3,oneof"`
+}
+
+type OffsetOption_TailN struct {
+       TailN int64 
`protobuf:"varint,3,opt,name=tail_n,json=tailN,proto3,oneof"`
+}
+
+type OffsetOption_Timestamp struct {
+       Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3,oneof"`
+}
+
+func (*OffsetOption_Policy_) isOffsetOption_OffsetType() {}
+
+func (*OffsetOption_Offset) isOffsetOption_OffsetType() {}
+
+func (*OffsetOption_TailN) isOffsetOption_OffsetType() {}
+
+func (*OffsetOption_Timestamp) isOffsetOption_OffsetType() {}
+
 var File_apache_rocketmq_v2_definition_proto protoreflect.FileDescriptor
 
 const file_apache_rocketmq_v2_definition_proto_rawDesc = "" +
@@ -2608,8 +2781,7 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc = 
"" +
        "\x14accept_message_types\x18\x05 
\x03(\x0e2\x1f.apache.rocketmq.v2.MessageTypeR\x12acceptMessageTypes\"X\n" +
        "\x06Digest\x122\n" +
        "\x04type\x18\x01 
\x01(\x0e2\x1e.apache.rocketmq.v2.DigestTypeR\x04type\x12\x1a\n" +
-       "\bchecksum\x18\x02 \x01(\tR\bchecksum\"\xe1\n" +
-       "\n" +
+       "\bchecksum\x18\x02 \x01(\tR\bchecksum\"\x8f\v\n" +
        "\x10SystemProperties\x12\x15\n" +
        "\x03tag\x18\x01 \x01(\tH\x00R\x03tag\x88\x01\x01\x12\x12\n" +
        "\x04keys\x18\x02 \x03(\tR\x04keys\x12\x1d\n" +
@@ -2637,7 +2809,8 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc = 
"" +
        "\x11dead_letter_queue\x18\x14 
\x01(\v2#.apache.rocketmq.v2.DeadLetterQueueH\n" +
        "R\x0fdeadLetterQueue\x88\x01\x01\x12\"\n" +
        "\n" +
-       "lite_topic\x18\x15 \x01(\tH\vR\tliteTopic\x88\x01\x01B\x06\n" +
+       "lite_topic\x18\x15 \x01(\tH\vR\tliteTopic\x88\x01\x01\x12\x1f\n" +
+       "\bpriority\x18\x16 \x01(\x05H\fR\bpriority\x88\x01\x01B\x06\n" +
        "\x04_tagB\x12\n" +
        "\x10_store_timestampB\x15\n" +
        "\x13_delivery_timestampB\x11\n" +
@@ -2649,7 +2822,8 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc = 
"" +
        "\x0e_trace_contextB)\n" +
        "'_orphaned_transaction_recovery_durationB\x14\n" +
        "\x12_dead_letter_queueB\r\n" +
-       "\v_lite_topic\"F\n" +
+       "\v_lite_topicB\v\n" +
+       "\t_priority\"F\n" +
        "\x0fDeadLetterQueue\x12\x14\n" +
        "\x05topic\x18\x01 \x01(\tR\x05topic\x12\x1d\n" +
        "\n" +
@@ -2714,7 +2888,17 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc = 
"" +
        "\x02on\x18\x01 \x01(\bR\x02on\x12@\n" +
        "\tendpoints\x18\x02 
\x01(\v2\x1d.apache.rocketmq.v2.EndpointsH\x00R\tendpoints\x88\x01\x01B\f\n" +
        "\n" +
-       "_endpoints*Y\n" +
+       "_endpoints\"\xd9\x01\n" +
+       "\fOffsetOption\x12A\n" +
+       "\x06policy\x18\x01 
\x01(\x0e2'.apache.rocketmq.v2.OffsetOption.PolicyH\x00R\x06policy\x12\x18\n" +
+       "\x06offset\x18\x02 \x01(\x03H\x00R\x06offset\x12\x17\n" +
+       "\x06tail_n\x18\x03 \x01(\x03H\x00R\x05tailN\x12\x1e\n" +
+       "\ttimestamp\x18\x04 \x01(\x03H\x00R\ttimestamp\"$\n" +
+       "\x06Policy\x12\b\n" +
+       "\x04LAST\x10\x00\x12\a\n" +
+       "\x03MIN\x10\x01\x12\a\n" +
+       "\x03MAX\x10\x02B\r\n" +
+       "\voffset_type*Y\n" +
        "\x15TransactionResolution\x12&\n" +
        "\"TRANSACTION_RESOLUTION_UNSPECIFIED\x10\x00\x12\n" +
        "\n" +
@@ -2741,7 +2925,7 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc = 
"" +
        "\x1aADDRESS_SCHEME_UNSPECIFIED\x10\x00\x12\b\n" +
        "\x04IPv4\x10\x01\x12\b\n" +
        "\x04IPv6\x10\x02\x12\x0f\n" +
-       "\vDOMAIN_NAME\x10\x03*g\n" +
+       "\vDOMAIN_NAME\x10\x03*u\n" +
        "\vMessageType\x12\x1c\n" +
        "\x18MESSAGE_TYPE_UNSPECIFIED\x10\x00\x12\n" +
        "\n" +
@@ -2749,7 +2933,8 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc = 
"" +
        "\x04FIFO\x10\x02\x12\t\n" +
        "\x05DELAY\x10\x03\x12\x0f\n" +
        "\vTRANSACTION\x10\x04\x12\b\n" +
-       "\x04LITE\x10\x05*G\n" +
+       "\x04LITE\x10\x05\x12\f\n" +
+       "\bPRIORITY\x10\x06*G\n" +
        "\n" +
        "DigestType\x12\x1b\n" +
        "\x17DIGEST_TYPE_UNSPECIFIED\x10\x00\x12\t\n" +
@@ -2864,8 +3049,8 @@ func 
file_apache_rocketmq_v2_definition_proto_rawDescGZIP() []byte {
        return file_apache_rocketmq_v2_definition_proto_rawDescData
 }
 
-var file_apache_rocketmq_v2_definition_proto_enumTypes = 
make([]protoimpl.EnumInfo, 13)
-var file_apache_rocketmq_v2_definition_proto_msgTypes = 
make([]protoimpl.MessageInfo, 22)
+var file_apache_rocketmq_v2_definition_proto_enumTypes = 
make([]protoimpl.EnumInfo, 14)
+var file_apache_rocketmq_v2_definition_proto_msgTypes = 
make([]protoimpl.MessageInfo, 23)
 var file_apache_rocketmq_v2_definition_proto_goTypes = []any{
        (TransactionResolution)(0),    // 0: 
apache.rocketmq.v2.TransactionResolution
        (TransactionSource)(0),        // 1: 
apache.rocketmq.v2.TransactionSource
@@ -2880,81 +3065,84 @@ var file_apache_rocketmq_v2_definition_proto_goTypes = 
[]any{
        (Language)(0),                 // 10: apache.rocketmq.v2.Language
        (LiteSubscriptionAction)(0),   // 11: 
apache.rocketmq.v2.LiteSubscriptionAction
        (QueryOffsetPolicy)(0),        // 12: 
apache.rocketmq.v2.QueryOffsetPolicy
-       (*FilterExpression)(nil),      // 13: 
apache.rocketmq.v2.FilterExpression
-       (*RetryPolicy)(nil),           // 14: apache.rocketmq.v2.RetryPolicy
-       (*ExponentialBackoff)(nil),    // 15: 
apache.rocketmq.v2.ExponentialBackoff
-       (*CustomizedBackoff)(nil),     // 16: 
apache.rocketmq.v2.CustomizedBackoff
-       (*Resource)(nil),              // 17: apache.rocketmq.v2.Resource
-       (*SubscriptionEntry)(nil),     // 18: 
apache.rocketmq.v2.SubscriptionEntry
-       (*Address)(nil),               // 19: apache.rocketmq.v2.Address
-       (*Endpoints)(nil),             // 20: apache.rocketmq.v2.Endpoints
-       (*Broker)(nil),                // 21: apache.rocketmq.v2.Broker
-       (*MessageQueue)(nil),          // 22: apache.rocketmq.v2.MessageQueue
-       (*Digest)(nil),                // 23: apache.rocketmq.v2.Digest
-       (*SystemProperties)(nil),      // 24: 
apache.rocketmq.v2.SystemProperties
-       (*DeadLetterQueue)(nil),       // 25: apache.rocketmq.v2.DeadLetterQueue
-       (*Message)(nil),               // 26: apache.rocketmq.v2.Message
-       (*Assignment)(nil),            // 27: apache.rocketmq.v2.Assignment
-       (*Status)(nil),                // 28: apache.rocketmq.v2.Status
-       (*UA)(nil),                    // 29: apache.rocketmq.v2.UA
-       (*Settings)(nil),              // 30: apache.rocketmq.v2.Settings
-       (*Publishing)(nil),            // 31: apache.rocketmq.v2.Publishing
-       (*Subscription)(nil),          // 32: apache.rocketmq.v2.Subscription
-       (*Metric)(nil),                // 33: apache.rocketmq.v2.Metric
-       nil,                           // 34: 
apache.rocketmq.v2.Message.UserPropertiesEntry
-       (*durationpb.Duration)(nil),   // 35: google.protobuf.Duration
-       (*timestamppb.Timestamp)(nil), // 36: google.protobuf.Timestamp
+       (OffsetOption_Policy)(0),      // 13: 
apache.rocketmq.v2.OffsetOption.Policy
+       (*FilterExpression)(nil),      // 14: 
apache.rocketmq.v2.FilterExpression
+       (*RetryPolicy)(nil),           // 15: apache.rocketmq.v2.RetryPolicy
+       (*ExponentialBackoff)(nil),    // 16: 
apache.rocketmq.v2.ExponentialBackoff
+       (*CustomizedBackoff)(nil),     // 17: 
apache.rocketmq.v2.CustomizedBackoff
+       (*Resource)(nil),              // 18: apache.rocketmq.v2.Resource
+       (*SubscriptionEntry)(nil),     // 19: 
apache.rocketmq.v2.SubscriptionEntry
+       (*Address)(nil),               // 20: apache.rocketmq.v2.Address
+       (*Endpoints)(nil),             // 21: apache.rocketmq.v2.Endpoints
+       (*Broker)(nil),                // 22: apache.rocketmq.v2.Broker
+       (*MessageQueue)(nil),          // 23: apache.rocketmq.v2.MessageQueue
+       (*Digest)(nil),                // 24: apache.rocketmq.v2.Digest
+       (*SystemProperties)(nil),      // 25: 
apache.rocketmq.v2.SystemProperties
+       (*DeadLetterQueue)(nil),       // 26: apache.rocketmq.v2.DeadLetterQueue
+       (*Message)(nil),               // 27: apache.rocketmq.v2.Message
+       (*Assignment)(nil),            // 28: apache.rocketmq.v2.Assignment
+       (*Status)(nil),                // 29: apache.rocketmq.v2.Status
+       (*UA)(nil),                    // 30: apache.rocketmq.v2.UA
+       (*Settings)(nil),              // 31: apache.rocketmq.v2.Settings
+       (*Publishing)(nil),            // 32: apache.rocketmq.v2.Publishing
+       (*Subscription)(nil),          // 33: apache.rocketmq.v2.Subscription
+       (*Metric)(nil),                // 34: apache.rocketmq.v2.Metric
+       (*OffsetOption)(nil),          // 35: apache.rocketmq.v2.OffsetOption
+       nil,                           // 36: 
apache.rocketmq.v2.Message.UserPropertiesEntry
+       (*durationpb.Duration)(nil),   // 37: google.protobuf.Duration
+       (*timestamppb.Timestamp)(nil), // 38: google.protobuf.Timestamp
 }
 var file_apache_rocketmq_v2_definition_proto_depIdxs = []int32{
        3,  // 0: apache.rocketmq.v2.FilterExpression.type:type_name -> 
apache.rocketmq.v2.FilterType
-       15, // 1: apache.rocketmq.v2.RetryPolicy.exponential_backoff:type_name 
-> apache.rocketmq.v2.ExponentialBackoff
-       16, // 2: apache.rocketmq.v2.RetryPolicy.customized_backoff:type_name 
-> apache.rocketmq.v2.CustomizedBackoff
-       35, // 3: apache.rocketmq.v2.ExponentialBackoff.initial:type_name -> 
google.protobuf.Duration
-       35, // 4: apache.rocketmq.v2.ExponentialBackoff.max:type_name -> 
google.protobuf.Duration
-       35, // 5: apache.rocketmq.v2.CustomizedBackoff.next:type_name -> 
google.protobuf.Duration
-       17, // 6: apache.rocketmq.v2.SubscriptionEntry.topic:type_name -> 
apache.rocketmq.v2.Resource
-       13, // 7: apache.rocketmq.v2.SubscriptionEntry.expression:type_name -> 
apache.rocketmq.v2.FilterExpression
+       16, // 1: apache.rocketmq.v2.RetryPolicy.exponential_backoff:type_name 
-> apache.rocketmq.v2.ExponentialBackoff
+       17, // 2: apache.rocketmq.v2.RetryPolicy.customized_backoff:type_name 
-> apache.rocketmq.v2.CustomizedBackoff
+       37, // 3: apache.rocketmq.v2.ExponentialBackoff.initial:type_name -> 
google.protobuf.Duration
+       37, // 4: apache.rocketmq.v2.ExponentialBackoff.max:type_name -> 
google.protobuf.Duration
+       37, // 5: apache.rocketmq.v2.CustomizedBackoff.next:type_name -> 
google.protobuf.Duration
+       18, // 6: apache.rocketmq.v2.SubscriptionEntry.topic:type_name -> 
apache.rocketmq.v2.Resource
+       14, // 7: apache.rocketmq.v2.SubscriptionEntry.expression:type_name -> 
apache.rocketmq.v2.FilterExpression
        4,  // 8: apache.rocketmq.v2.Endpoints.scheme:type_name -> 
apache.rocketmq.v2.AddressScheme
-       19, // 9: apache.rocketmq.v2.Endpoints.addresses:type_name -> 
apache.rocketmq.v2.Address
-       20, // 10: apache.rocketmq.v2.Broker.endpoints:type_name -> 
apache.rocketmq.v2.Endpoints
-       17, // 11: apache.rocketmq.v2.MessageQueue.topic:type_name -> 
apache.rocketmq.v2.Resource
+       20, // 9: apache.rocketmq.v2.Endpoints.addresses:type_name -> 
apache.rocketmq.v2.Address
+       21, // 10: apache.rocketmq.v2.Broker.endpoints:type_name -> 
apache.rocketmq.v2.Endpoints
+       18, // 11: apache.rocketmq.v2.MessageQueue.topic:type_name -> 
apache.rocketmq.v2.Resource
        2,  // 12: apache.rocketmq.v2.MessageQueue.permission:type_name -> 
apache.rocketmq.v2.Permission
-       21, // 13: apache.rocketmq.v2.MessageQueue.broker:type_name -> 
apache.rocketmq.v2.Broker
+       22, // 13: apache.rocketmq.v2.MessageQueue.broker:type_name -> 
apache.rocketmq.v2.Broker
        5,  // 14: 
apache.rocketmq.v2.MessageQueue.accept_message_types:type_name -> 
apache.rocketmq.v2.MessageType
        6,  // 15: apache.rocketmq.v2.Digest.type:type_name -> 
apache.rocketmq.v2.DigestType
-       23, // 16: apache.rocketmq.v2.SystemProperties.body_digest:type_name -> 
apache.rocketmq.v2.Digest
+       24, // 16: apache.rocketmq.v2.SystemProperties.body_digest:type_name -> 
apache.rocketmq.v2.Digest
        8,  // 17: apache.rocketmq.v2.SystemProperties.body_encoding:type_name 
-> apache.rocketmq.v2.Encoding
        5,  // 18: apache.rocketmq.v2.SystemProperties.message_type:type_name 
-> apache.rocketmq.v2.MessageType
-       36, // 19: apache.rocketmq.v2.SystemProperties.born_timestamp:type_name 
-> google.protobuf.Timestamp
-       36, // 20: 
apache.rocketmq.v2.SystemProperties.store_timestamp:type_name -> 
google.protobuf.Timestamp
-       36, // 21: 
apache.rocketmq.v2.SystemProperties.delivery_timestamp:type_name -> 
google.protobuf.Timestamp
-       35, // 22: 
apache.rocketmq.v2.SystemProperties.invisible_duration:type_name -> 
google.protobuf.Duration
-       35, // 23: 
apache.rocketmq.v2.SystemProperties.orphaned_transaction_recovery_duration:type_name
 -> google.protobuf.Duration
-       25, // 24: 
apache.rocketmq.v2.SystemProperties.dead_letter_queue:type_name -> 
apache.rocketmq.v2.DeadLetterQueue
-       17, // 25: apache.rocketmq.v2.Message.topic:type_name -> 
apache.rocketmq.v2.Resource
-       34, // 26: apache.rocketmq.v2.Message.user_properties:type_name -> 
apache.rocketmq.v2.Message.UserPropertiesEntry
-       24, // 27: apache.rocketmq.v2.Message.system_properties:type_name -> 
apache.rocketmq.v2.SystemProperties
-       22, // 28: apache.rocketmq.v2.Assignment.message_queue:type_name -> 
apache.rocketmq.v2.MessageQueue
+       38, // 19: apache.rocketmq.v2.SystemProperties.born_timestamp:type_name 
-> google.protobuf.Timestamp
+       38, // 20: 
apache.rocketmq.v2.SystemProperties.store_timestamp:type_name -> 
google.protobuf.Timestamp
+       38, // 21: 
apache.rocketmq.v2.SystemProperties.delivery_timestamp:type_name -> 
google.protobuf.Timestamp
+       37, // 22: 
apache.rocketmq.v2.SystemProperties.invisible_duration:type_name -> 
google.protobuf.Duration
+       37, // 23: 
apache.rocketmq.v2.SystemProperties.orphaned_transaction_recovery_duration:type_name
 -> google.protobuf.Duration
+       26, // 24: 
apache.rocketmq.v2.SystemProperties.dead_letter_queue:type_name -> 
apache.rocketmq.v2.DeadLetterQueue
+       18, // 25: apache.rocketmq.v2.Message.topic:type_name -> 
apache.rocketmq.v2.Resource
+       36, // 26: apache.rocketmq.v2.Message.user_properties:type_name -> 
apache.rocketmq.v2.Message.UserPropertiesEntry
+       25, // 27: apache.rocketmq.v2.Message.system_properties:type_name -> 
apache.rocketmq.v2.SystemProperties
+       23, // 28: apache.rocketmq.v2.Assignment.message_queue:type_name -> 
apache.rocketmq.v2.MessageQueue
        9,  // 29: apache.rocketmq.v2.Status.code:type_name -> 
apache.rocketmq.v2.Code
        10, // 30: apache.rocketmq.v2.UA.language:type_name -> 
apache.rocketmq.v2.Language
        7,  // 31: apache.rocketmq.v2.Settings.client_type:type_name -> 
apache.rocketmq.v2.ClientType
-       20, // 32: apache.rocketmq.v2.Settings.access_point:type_name -> 
apache.rocketmq.v2.Endpoints
-       14, // 33: apache.rocketmq.v2.Settings.backoff_policy:type_name -> 
apache.rocketmq.v2.RetryPolicy
-       35, // 34: apache.rocketmq.v2.Settings.request_timeout:type_name -> 
google.protobuf.Duration
-       31, // 35: apache.rocketmq.v2.Settings.publishing:type_name -> 
apache.rocketmq.v2.Publishing
-       32, // 36: apache.rocketmq.v2.Settings.subscription:type_name -> 
apache.rocketmq.v2.Subscription
-       29, // 37: apache.rocketmq.v2.Settings.user_agent:type_name -> 
apache.rocketmq.v2.UA
-       33, // 38: apache.rocketmq.v2.Settings.metric:type_name -> 
apache.rocketmq.v2.Metric
-       17, // 39: apache.rocketmq.v2.Publishing.topics:type_name -> 
apache.rocketmq.v2.Resource
-       17, // 40: apache.rocketmq.v2.Subscription.group:type_name -> 
apache.rocketmq.v2.Resource
-       18, // 41: apache.rocketmq.v2.Subscription.subscriptions:type_name -> 
apache.rocketmq.v2.SubscriptionEntry
-       35, // 42: 
apache.rocketmq.v2.Subscription.long_polling_timeout:type_name -> 
google.protobuf.Duration
-       20, // 43: apache.rocketmq.v2.Metric.endpoints:type_name -> 
apache.rocketmq.v2.Endpoints
-       44, // [44:44] is the sub-list for method output_type
-       44, // [44:44] is the sub-list for method input_type
-       44, // [44:44] is the sub-list for extension type_name
-       44, // [44:44] is the sub-list for extension extendee
-       0,  // [0:44] is the sub-list for field type_name
+       21, // 32: apache.rocketmq.v2.Settings.access_point:type_name -> 
apache.rocketmq.v2.Endpoints
+       15, // 33: apache.rocketmq.v2.Settings.backoff_policy:type_name -> 
apache.rocketmq.v2.RetryPolicy
+       37, // 34: apache.rocketmq.v2.Settings.request_timeout:type_name -> 
google.protobuf.Duration
+       32, // 35: apache.rocketmq.v2.Settings.publishing:type_name -> 
apache.rocketmq.v2.Publishing
+       33, // 36: apache.rocketmq.v2.Settings.subscription:type_name -> 
apache.rocketmq.v2.Subscription
+       30, // 37: apache.rocketmq.v2.Settings.user_agent:type_name -> 
apache.rocketmq.v2.UA
+       34, // 38: apache.rocketmq.v2.Settings.metric:type_name -> 
apache.rocketmq.v2.Metric
+       18, // 39: apache.rocketmq.v2.Publishing.topics:type_name -> 
apache.rocketmq.v2.Resource
+       18, // 40: apache.rocketmq.v2.Subscription.group:type_name -> 
apache.rocketmq.v2.Resource
+       19, // 41: apache.rocketmq.v2.Subscription.subscriptions:type_name -> 
apache.rocketmq.v2.SubscriptionEntry
+       37, // 42: 
apache.rocketmq.v2.Subscription.long_polling_timeout:type_name -> 
google.protobuf.Duration
+       21, // 43: apache.rocketmq.v2.Metric.endpoints:type_name -> 
apache.rocketmq.v2.Endpoints
+       13, // 44: apache.rocketmq.v2.OffsetOption.policy:type_name -> 
apache.rocketmq.v2.OffsetOption.Policy
+       45, // [45:45] is the sub-list for method output_type
+       45, // [45:45] is the sub-list for method input_type
+       45, // [45:45] is the sub-list for extension type_name
+       45, // [45:45] is the sub-list for extension extendee
+       0,  // [0:45] is the sub-list for field type_name
 }
 
 func init() { file_apache_rocketmq_v2_definition_proto_init() }
@@ -2973,13 +3161,19 @@ func file_apache_rocketmq_v2_definition_proto_init() {
        }
        file_apache_rocketmq_v2_definition_proto_msgTypes[19].OneofWrappers = 
[]any{}
        file_apache_rocketmq_v2_definition_proto_msgTypes[20].OneofWrappers = 
[]any{}
+       file_apache_rocketmq_v2_definition_proto_msgTypes[21].OneofWrappers = 
[]any{
+               (*OffsetOption_Policy_)(nil),
+               (*OffsetOption_Offset)(nil),
+               (*OffsetOption_TailN)(nil),
+               (*OffsetOption_Timestamp)(nil),
+       }
        type x struct{}
        out := protoimpl.TypeBuilder{
                File: protoimpl.DescBuilder{
                        GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
                        RawDescriptor: 
unsafe.Slice(unsafe.StringData(file_apache_rocketmq_v2_definition_proto_rawDesc),
 len(file_apache_rocketmq_v2_definition_proto_rawDesc)),
-                       NumEnums:      13,
-                       NumMessages:   22,
+                       NumEnums:      14,
+                       NumMessages:   23,
                        NumExtensions: 0,
                        NumServices:   0,
                },
diff --git a/golang/protocol/v2/service.pb.go b/golang/protocol/v2/service.pb.go
index d2895eb5..cdf05778 100644
--- a/golang/protocol/v2/service.pb.go
+++ b/golang/protocol/v2/service.pb.go
@@ -15,20 +15,21 @@
 
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-//     protoc-gen-go v1.36.8
-//     protoc        v5.29.3
+//     protoc-gen-go v1.36.11
+//     protoc        v7.34.0
 // source: apache/rocketmq/v2/service.proto
 
 package v2
 
 import (
+       reflect "reflect"
+       sync "sync"
+       unsafe "unsafe"
+
        protoreflect "google.golang.org/protobuf/reflect/protoreflect"
        protoimpl "google.golang.org/protobuf/runtime/protoimpl"
        durationpb "google.golang.org/protobuf/types/known/durationpb"
        timestamppb "google.golang.org/protobuf/types/known/timestamppb"
-       reflect "reflect"
-       sync "sync"
-       unsafe "unsafe"
 )
 
 const (
@@ -1872,7 +1873,10 @@ type ChangeInvisibleDurationRequest struct {
        // New invisible duration
        InvisibleDuration *durationpb.Duration 
`protobuf:"bytes,4,opt,name=invisible_duration,json=invisibleDuration,proto3" 
json:"invisible_duration,omitempty"`
        // For message tracing
-       MessageId     string 
`protobuf:"bytes,5,opt,name=message_id,json=messageId,proto3" 
json:"message_id,omitempty"`
+       MessageId string  
`protobuf:"bytes,5,opt,name=message_id,json=messageId,proto3" 
json:"message_id,omitempty"`
+       LiteTopic *string 
`protobuf:"bytes,6,opt,name=lite_topic,json=liteTopic,proto3,oneof" 
json:"lite_topic,omitempty"`
+       // If true, server will not increment the retry times for this message
+       Suspend       *bool `protobuf:"varint,7,opt,name=suspend,proto3,oneof" 
json:"suspend,omitempty"`
        unknownFields protoimpl.UnknownFields
        sizeCache     protoimpl.SizeCache
 }
@@ -1942,6 +1946,20 @@ func (x *ChangeInvisibleDurationRequest) GetMessageId() 
string {
        return ""
 }
 
+func (x *ChangeInvisibleDurationRequest) GetLiteTopic() string {
+       if x != nil && x.LiteTopic != nil {
+               return *x.LiteTopic
+       }
+       return ""
+}
+
+func (x *ChangeInvisibleDurationRequest) GetSuspend() bool {
+       if x != nil && x.Suspend != nil {
+               return *x.Suspend
+       }
+       return false
+}
+
 type ChangeInvisibleDurationResponse struct {
        state  protoimpl.MessageState `protogen:"open.v1"`
        Status *Status                
`protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
@@ -2610,8 +2628,9 @@ type SyncLiteSubscriptionRequest struct {
        // consumer group
        Group *Resource `protobuf:"bytes,3,opt,name=group,proto3" 
json:"group,omitempty"`
        // lite subscription set of lite topics
-       LiteTopicSet  []string 
`protobuf:"bytes,4,rep,name=lite_topic_set,json=liteTopicSet,proto3" 
json:"lite_topic_set,omitempty"`
-       Version       *int64   
`protobuf:"varint,5,opt,name=version,proto3,oneof" json:"version,omitempty"`
+       LiteTopicSet  []string      
`protobuf:"bytes,4,rep,name=lite_topic_set,json=liteTopicSet,proto3" 
json:"lite_topic_set,omitempty"`
+       Version       *int64        
`protobuf:"varint,5,opt,name=version,proto3,oneof" json:"version,omitempty"`
+       OffsetOption  *OffsetOption 
`protobuf:"bytes,6,opt,name=offset_option,json=offsetOption,proto3,oneof" 
json:"offset_option,omitempty"`
        unknownFields protoimpl.UnknownFields
        sizeCache     protoimpl.SizeCache
 }
@@ -2681,6 +2700,13 @@ func (x *SyncLiteSubscriptionRequest) GetVersion() int64 
{
        return 0
 }
 
+func (x *SyncLiteSubscriptionRequest) GetOffsetOption() *OffsetOption {
+       if x != nil {
+               return x.OffsetOption
+       }
+       return nil
+}
+
 type SyncLiteSubscriptionResponse struct {
        state         protoimpl.MessageState `protogen:"open.v1"`
        Status        *Status                
`protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
@@ -2861,14 +2887,20 @@ const file_apache_rocketmq_v2_service_proto_rawDesc = 
"" +
        "\x05group\x18\x01 
\x01(\v2\x1c.apache.rocketmq.v2.ResourceH\x00R\x05group\x88\x01\x01B\b\n" +
        "\x06_group\"U\n" +
        "\x1fNotifyClientTerminationResponse\x122\n" +
-       "\x06status\x18\x01 
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\"\x98\x02\n" +
+       "\x06status\x18\x01 
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\"\xf6\x02\n" +
        "\x1eChangeInvisibleDurationRequest\x122\n" +
        "\x05group\x18\x01 
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05group\x122\n" +
        "\x05topic\x18\x02 
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05topic\x12%\n" +
        "\x0ereceipt_handle\x18\x03 \x01(\tR\rreceiptHandle\x12H\n" +
        "\x12invisible_duration\x18\x04 
\x01(\v2\x19.google.protobuf.DurationR\x11invisibleDuration\x12\x1d\n" +
        "\n" +
-       "message_id\x18\x05 \x01(\tR\tmessageId\"|\n" +
+       "message_id\x18\x05 \x01(\tR\tmessageId\x12\"\n" +
+       "\n" +
+       "lite_topic\x18\x06 \x01(\tH\x00R\tliteTopic\x88\x01\x01\x12\x1d\n" +
+       "\asuspend\x18\a \x01(\bH\x01R\asuspend\x88\x01\x01B\r\n" +
+       "\v_lite_topicB\n" +
+       "\n" +
+       "\b_suspend\"|\n" +
        "\x1fChangeInvisibleDurationResponse\x122\n" +
        "\x06status\x18\x01 
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\x12%\n" +
        "\x0ereceipt_handle\x18\x02 \x01(\tR\rreceiptHandle\"\xe6\x02\n" +
@@ -2913,15 +2945,17 @@ const file_apache_rocketmq_v2_service_proto_rawDesc = 
"" +
        "\x15RecallMessageResponse\x122\n" +
        "\x06status\x18\x01 
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\x12\x1d\n" +
        "\n" +
-       "message_id\x18\x02 \x01(\tR\tmessageId\"\x9a\x02\n" +
+       "message_id\x18\x02 \x01(\tR\tmessageId\"\xf8\x02\n" +
        "\x1bSyncLiteSubscriptionRequest\x12B\n" +
        "\x06action\x18\x01 
\x01(\x0e2*.apache.rocketmq.v2.LiteSubscriptionActionR\x06action\x122\n" +
        "\x05topic\x18\x02 
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05topic\x122\n" +
        "\x05group\x18\x03 
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05group\x12$\n" +
        "\x0elite_topic_set\x18\x04 \x03(\tR\fliteTopicSet\x12\x1d\n" +
-       "\aversion\x18\x05 \x01(\x03H\x00R\aversion\x88\x01\x01B\n" +
+       "\aversion\x18\x05 \x01(\x03H\x00R\aversion\x88\x01\x01\x12J\n" +
+       "\roffset_option\x18\x06 \x01(\v2 
.apache.rocketmq.v2.OffsetOptionH\x01R\foffsetOption\x88\x01\x01B\n" +
        "\n" +
-       "\b_version\"R\n" +
+       "\b_versionB\x10\n" +
+       "\x0e_offset_option\"R\n" +
        "\x1cSyncLiteSubscriptionResponse\x122\n" +
        "\x06status\x18\x01 
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status2\xcc\x0e\n" +
        "\x10MessagingService\x12]\n" +
@@ -3018,6 +3052,7 @@ var file_apache_rocketmq_v2_service_proto_goTypes = []any{
        (*Settings)(nil),                                // 55: 
apache.rocketmq.v2.Settings
        (QueryOffsetPolicy)(0),                          // 56: 
apache.rocketmq.v2.QueryOffsetPolicy
        (LiteSubscriptionAction)(0),                     // 57: 
apache.rocketmq.v2.LiteSubscriptionAction
+       (*OffsetOption)(nil),                            // 58: 
apache.rocketmq.v2.OffsetOption
 }
 var file_apache_rocketmq_v2_service_proto_depIdxs = []int32{
        43, // 0: apache.rocketmq.v2.QueryRouteRequest.topic:type_name -> 
apache.rocketmq.v2.Resource
@@ -3095,46 +3130,47 @@ var file_apache_rocketmq_v2_service_proto_depIdxs = 
[]int32{
        57, // 72: 
apache.rocketmq.v2.SyncLiteSubscriptionRequest.action:type_name -> 
apache.rocketmq.v2.LiteSubscriptionAction
        43, // 73: 
apache.rocketmq.v2.SyncLiteSubscriptionRequest.topic:type_name -> 
apache.rocketmq.v2.Resource
        43, // 74: 
apache.rocketmq.v2.SyncLiteSubscriptionRequest.group:type_name -> 
apache.rocketmq.v2.Resource
-       45, // 75: 
apache.rocketmq.v2.SyncLiteSubscriptionResponse.status:type_name -> 
apache.rocketmq.v2.Status
-       0,  // 76: apache.rocketmq.v2.MessagingService.QueryRoute:input_type -> 
apache.rocketmq.v2.QueryRouteRequest
-       15, // 77: apache.rocketmq.v2.MessagingService.Heartbeat:input_type -> 
apache.rocketmq.v2.HeartbeatRequest
-       2,  // 78: apache.rocketmq.v2.MessagingService.SendMessage:input_type 
-> apache.rocketmq.v2.SendMessageRequest
-       5,  // 79: 
apache.rocketmq.v2.MessagingService.QueryAssignment:input_type -> 
apache.rocketmq.v2.QueryAssignmentRequest
-       7,  // 80: 
apache.rocketmq.v2.MessagingService.ReceiveMessage:input_type -> 
apache.rocketmq.v2.ReceiveMessageRequest
-       10, // 81: apache.rocketmq.v2.MessagingService.AckMessage:input_type -> 
apache.rocketmq.v2.AckMessageRequest
-       13, // 82: 
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:input_type 
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest
-       31, // 83: apache.rocketmq.v2.MessagingService.PullMessage:input_type 
-> apache.rocketmq.v2.PullMessageRequest
-       33, // 84: apache.rocketmq.v2.MessagingService.UpdateOffset:input_type 
-> apache.rocketmq.v2.UpdateOffsetRequest
-       35, // 85: apache.rocketmq.v2.MessagingService.GetOffset:input_type -> 
apache.rocketmq.v2.GetOffsetRequest
-       37, // 86: apache.rocketmq.v2.MessagingService.QueryOffset:input_type 
-> apache.rocketmq.v2.QueryOffsetRequest
-       17, // 87: 
apache.rocketmq.v2.MessagingService.EndTransaction:input_type -> 
apache.rocketmq.v2.EndTransactionRequest
-       26, // 88: apache.rocketmq.v2.MessagingService.Telemetry:input_type -> 
apache.rocketmq.v2.TelemetryCommand
-       27, // 89: 
apache.rocketmq.v2.MessagingService.NotifyClientTermination:input_type -> 
apache.rocketmq.v2.NotifyClientTerminationRequest
-       29, // 90: 
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:input_type -> 
apache.rocketmq.v2.ChangeInvisibleDurationRequest
-       39, // 91: apache.rocketmq.v2.MessagingService.RecallMessage:input_type 
-> apache.rocketmq.v2.RecallMessageRequest
-       41, // 92: 
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:input_type -> 
apache.rocketmq.v2.SyncLiteSubscriptionRequest
-       1,  // 93: apache.rocketmq.v2.MessagingService.QueryRoute:output_type 
-> apache.rocketmq.v2.QueryRouteResponse
-       16, // 94: apache.rocketmq.v2.MessagingService.Heartbeat:output_type -> 
apache.rocketmq.v2.HeartbeatResponse
-       4,  // 95: apache.rocketmq.v2.MessagingService.SendMessage:output_type 
-> apache.rocketmq.v2.SendMessageResponse
-       6,  // 96: 
apache.rocketmq.v2.MessagingService.QueryAssignment:output_type -> 
apache.rocketmq.v2.QueryAssignmentResponse
-       8,  // 97: 
apache.rocketmq.v2.MessagingService.ReceiveMessage:output_type -> 
apache.rocketmq.v2.ReceiveMessageResponse
-       12, // 98: apache.rocketmq.v2.MessagingService.AckMessage:output_type 
-> apache.rocketmq.v2.AckMessageResponse
-       14, // 99: 
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:output_type 
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse
-       32, // 100: apache.rocketmq.v2.MessagingService.PullMessage:output_type 
-> apache.rocketmq.v2.PullMessageResponse
-       34, // 101: 
apache.rocketmq.v2.MessagingService.UpdateOffset:output_type -> 
apache.rocketmq.v2.UpdateOffsetResponse
-       36, // 102: apache.rocketmq.v2.MessagingService.GetOffset:output_type 
-> apache.rocketmq.v2.GetOffsetResponse
-       38, // 103: apache.rocketmq.v2.MessagingService.QueryOffset:output_type 
-> apache.rocketmq.v2.QueryOffsetResponse
-       18, // 104: 
apache.rocketmq.v2.MessagingService.EndTransaction:output_type -> 
apache.rocketmq.v2.EndTransactionResponse
-       26, // 105: apache.rocketmq.v2.MessagingService.Telemetry:output_type 
-> apache.rocketmq.v2.TelemetryCommand
-       28, // 106: 
apache.rocketmq.v2.MessagingService.NotifyClientTermination:output_type -> 
apache.rocketmq.v2.NotifyClientTerminationResponse
-       30, // 107: 
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:output_type -> 
apache.rocketmq.v2.ChangeInvisibleDurationResponse
-       40, // 108: 
apache.rocketmq.v2.MessagingService.RecallMessage:output_type -> 
apache.rocketmq.v2.RecallMessageResponse
-       42, // 109: 
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:output_type -> 
apache.rocketmq.v2.SyncLiteSubscriptionResponse
-       93, // [93:110] is the sub-list for method output_type
-       76, // [76:93] is the sub-list for method input_type
-       76, // [76:76] is the sub-list for extension type_name
-       76, // [76:76] is the sub-list for extension extendee
-       0,  // [0:76] is the sub-list for field type_name
+       58, // 75: 
apache.rocketmq.v2.SyncLiteSubscriptionRequest.offset_option:type_name -> 
apache.rocketmq.v2.OffsetOption
+       45, // 76: 
apache.rocketmq.v2.SyncLiteSubscriptionResponse.status:type_name -> 
apache.rocketmq.v2.Status
+       0,  // 77: apache.rocketmq.v2.MessagingService.QueryRoute:input_type -> 
apache.rocketmq.v2.QueryRouteRequest
+       15, // 78: apache.rocketmq.v2.MessagingService.Heartbeat:input_type -> 
apache.rocketmq.v2.HeartbeatRequest
+       2,  // 79: apache.rocketmq.v2.MessagingService.SendMessage:input_type 
-> apache.rocketmq.v2.SendMessageRequest
+       5,  // 80: 
apache.rocketmq.v2.MessagingService.QueryAssignment:input_type -> 
apache.rocketmq.v2.QueryAssignmentRequest
+       7,  // 81: 
apache.rocketmq.v2.MessagingService.ReceiveMessage:input_type -> 
apache.rocketmq.v2.ReceiveMessageRequest
+       10, // 82: apache.rocketmq.v2.MessagingService.AckMessage:input_type -> 
apache.rocketmq.v2.AckMessageRequest
+       13, // 83: 
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:input_type 
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest
+       31, // 84: apache.rocketmq.v2.MessagingService.PullMessage:input_type 
-> apache.rocketmq.v2.PullMessageRequest
+       33, // 85: apache.rocketmq.v2.MessagingService.UpdateOffset:input_type 
-> apache.rocketmq.v2.UpdateOffsetRequest
+       35, // 86: apache.rocketmq.v2.MessagingService.GetOffset:input_type -> 
apache.rocketmq.v2.GetOffsetRequest
+       37, // 87: apache.rocketmq.v2.MessagingService.QueryOffset:input_type 
-> apache.rocketmq.v2.QueryOffsetRequest
+       17, // 88: 
apache.rocketmq.v2.MessagingService.EndTransaction:input_type -> 
apache.rocketmq.v2.EndTransactionRequest
+       26, // 89: apache.rocketmq.v2.MessagingService.Telemetry:input_type -> 
apache.rocketmq.v2.TelemetryCommand
+       27, // 90: 
apache.rocketmq.v2.MessagingService.NotifyClientTermination:input_type -> 
apache.rocketmq.v2.NotifyClientTerminationRequest
+       29, // 91: 
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:input_type -> 
apache.rocketmq.v2.ChangeInvisibleDurationRequest
+       39, // 92: apache.rocketmq.v2.MessagingService.RecallMessage:input_type 
-> apache.rocketmq.v2.RecallMessageRequest
+       41, // 93: 
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:input_type -> 
apache.rocketmq.v2.SyncLiteSubscriptionRequest
+       1,  // 94: apache.rocketmq.v2.MessagingService.QueryRoute:output_type 
-> apache.rocketmq.v2.QueryRouteResponse
+       16, // 95: apache.rocketmq.v2.MessagingService.Heartbeat:output_type -> 
apache.rocketmq.v2.HeartbeatResponse
+       4,  // 96: apache.rocketmq.v2.MessagingService.SendMessage:output_type 
-> apache.rocketmq.v2.SendMessageResponse
+       6,  // 97: 
apache.rocketmq.v2.MessagingService.QueryAssignment:output_type -> 
apache.rocketmq.v2.QueryAssignmentResponse
+       8,  // 98: 
apache.rocketmq.v2.MessagingService.ReceiveMessage:output_type -> 
apache.rocketmq.v2.ReceiveMessageResponse
+       12, // 99: apache.rocketmq.v2.MessagingService.AckMessage:output_type 
-> apache.rocketmq.v2.AckMessageResponse
+       14, // 100: 
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:output_type 
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse
+       32, // 101: apache.rocketmq.v2.MessagingService.PullMessage:output_type 
-> apache.rocketmq.v2.PullMessageResponse
+       34, // 102: 
apache.rocketmq.v2.MessagingService.UpdateOffset:output_type -> 
apache.rocketmq.v2.UpdateOffsetResponse
+       36, // 103: apache.rocketmq.v2.MessagingService.GetOffset:output_type 
-> apache.rocketmq.v2.GetOffsetResponse
+       38, // 104: apache.rocketmq.v2.MessagingService.QueryOffset:output_type 
-> apache.rocketmq.v2.QueryOffsetResponse
+       18, // 105: 
apache.rocketmq.v2.MessagingService.EndTransaction:output_type -> 
apache.rocketmq.v2.EndTransactionResponse
+       26, // 106: apache.rocketmq.v2.MessagingService.Telemetry:output_type 
-> apache.rocketmq.v2.TelemetryCommand
+       28, // 107: 
apache.rocketmq.v2.MessagingService.NotifyClientTermination:output_type -> 
apache.rocketmq.v2.NotifyClientTerminationResponse
+       30, // 108: 
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:output_type -> 
apache.rocketmq.v2.ChangeInvisibleDurationResponse
+       40, // 109: 
apache.rocketmq.v2.MessagingService.RecallMessage:output_type -> 
apache.rocketmq.v2.RecallMessageResponse
+       42, // 110: 
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:output_type -> 
apache.rocketmq.v2.SyncLiteSubscriptionResponse
+       94, // [94:111] is the sub-list for method output_type
+       77, // [77:94] is the sub-list for method input_type
+       77, // [77:77] is the sub-list for extension type_name
+       77, // [77:77] is the sub-list for extension extendee
+       0,  // [0:77] is the sub-list for field type_name
 }
 
 func init() { file_apache_rocketmq_v2_service_proto_init() }
@@ -3164,6 +3200,7 @@ func file_apache_rocketmq_v2_service_proto_init() {
                (*TelemetryCommand_NotifyUnsubscribeLiteCommand)(nil),
        }
        file_apache_rocketmq_v2_service_proto_msgTypes[27].OneofWrappers = 
[]any{}
+       file_apache_rocketmq_v2_service_proto_msgTypes[29].OneofWrappers = 
[]any{}
        file_apache_rocketmq_v2_service_proto_msgTypes[32].OneofWrappers = 
[]any{
                (*PullMessageResponse_Status)(nil),
                (*PullMessageResponse_Message)(nil),
diff --git a/golang/protocol/v2/service_grpc.pb.go 
b/golang/protocol/v2/service_grpc.pb.go
index b6ecd418..61073218 100644
--- a/golang/protocol/v2/service_grpc.pb.go
+++ b/golang/protocol/v2/service_grpc.pb.go
@@ -15,14 +15,15 @@
 
 // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
 // versions:
-// - protoc-gen-go-grpc v1.5.1
-// - protoc             v5.29.3
+// - protoc-gen-go-grpc v1.6.1
+// - protoc             v7.34.0
 // source: apache/rocketmq/v2/service.proto
 
 package v2
 
 import (
        context "context"
+
        grpc "google.golang.org/grpc"
        codes "google.golang.org/grpc/codes"
        status "google.golang.org/grpc/status"
@@ -491,55 +492,55 @@ type MessagingServiceServer interface {
 type UnimplementedMessagingServiceServer struct{}
 
 func (UnimplementedMessagingServiceServer) QueryRoute(context.Context, 
*QueryRouteRequest) (*QueryRouteResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method QueryRoute not 
implemented")
+       return nil, status.Error(codes.Unimplemented, "method QueryRoute not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) Heartbeat(context.Context, 
*HeartbeatRequest) (*HeartbeatResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method Heartbeat not 
implemented")
+       return nil, status.Error(codes.Unimplemented, "method Heartbeat not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) SendMessage(context.Context, 
*SendMessageRequest) (*SendMessageResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method SendMessage not 
implemented")
+       return nil, status.Error(codes.Unimplemented, "method SendMessage not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) QueryAssignment(context.Context, 
*QueryAssignmentRequest) (*QueryAssignmentResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method QueryAssignment 
not implemented")
+       return nil, status.Error(codes.Unimplemented, "method QueryAssignment 
not implemented")
 }
 func (UnimplementedMessagingServiceServer) 
ReceiveMessage(*ReceiveMessageRequest, 
grpc.ServerStreamingServer[ReceiveMessageResponse]) error {
-       return status.Errorf(codes.Unimplemented, "method ReceiveMessage not 
implemented")
+       return status.Error(codes.Unimplemented, "method ReceiveMessage not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) AckMessage(context.Context, 
*AckMessageRequest) (*AckMessageResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method AckMessage not 
implemented")
+       return nil, status.Error(codes.Unimplemented, "method AckMessage not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) 
ForwardMessageToDeadLetterQueue(context.Context, 
*ForwardMessageToDeadLetterQueueRequest) 
(*ForwardMessageToDeadLetterQueueResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method 
ForwardMessageToDeadLetterQueue not implemented")
+       return nil, status.Error(codes.Unimplemented, "method 
ForwardMessageToDeadLetterQueue not implemented")
 }
 func (UnimplementedMessagingServiceServer) PullMessage(*PullMessageRequest, 
grpc.ServerStreamingServer[PullMessageResponse]) error {
-       return status.Errorf(codes.Unimplemented, "method PullMessage not 
implemented")
+       return status.Error(codes.Unimplemented, "method PullMessage not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) UpdateOffset(context.Context, 
*UpdateOffsetRequest) (*UpdateOffsetResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method UpdateOffset not 
implemented")
+       return nil, status.Error(codes.Unimplemented, "method UpdateOffset not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) GetOffset(context.Context, 
*GetOffsetRequest) (*GetOffsetResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method GetOffset not 
implemented")
+       return nil, status.Error(codes.Unimplemented, "method GetOffset not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) QueryOffset(context.Context, 
*QueryOffsetRequest) (*QueryOffsetResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method QueryOffset not 
implemented")
+       return nil, status.Error(codes.Unimplemented, "method QueryOffset not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) EndTransaction(context.Context, 
*EndTransactionRequest) (*EndTransactionResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method EndTransaction 
not implemented")
+       return nil, status.Error(codes.Unimplemented, "method EndTransaction 
not implemented")
 }
 func (UnimplementedMessagingServiceServer) 
Telemetry(grpc.BidiStreamingServer[TelemetryCommand, TelemetryCommand]) error {
-       return status.Errorf(codes.Unimplemented, "method Telemetry not 
implemented")
+       return status.Error(codes.Unimplemented, "method Telemetry not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) 
NotifyClientTermination(context.Context, *NotifyClientTerminationRequest) 
(*NotifyClientTerminationResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method 
NotifyClientTermination not implemented")
+       return nil, status.Error(codes.Unimplemented, "method 
NotifyClientTermination not implemented")
 }
 func (UnimplementedMessagingServiceServer) 
ChangeInvisibleDuration(context.Context, *ChangeInvisibleDurationRequest) 
(*ChangeInvisibleDurationResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method 
ChangeInvisibleDuration not implemented")
+       return nil, status.Error(codes.Unimplemented, "method 
ChangeInvisibleDuration not implemented")
 }
 func (UnimplementedMessagingServiceServer) RecallMessage(context.Context, 
*RecallMessageRequest) (*RecallMessageResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method RecallMessage 
not implemented")
+       return nil, status.Error(codes.Unimplemented, "method RecallMessage not 
implemented")
 }
 func (UnimplementedMessagingServiceServer) 
SyncLiteSubscription(context.Context, *SyncLiteSubscriptionRequest) 
(*SyncLiteSubscriptionResponse, error) {
-       return nil, status.Errorf(codes.Unimplemented, "method 
SyncLiteSubscription not implemented")
+       return nil, status.Error(codes.Unimplemented, "method 
SyncLiteSubscription not implemented")
 }
 func (UnimplementedMessagingServiceServer) 
mustEmbedUnimplementedMessagingServiceServer() {}
 func (UnimplementedMessagingServiceServer) testEmbeddedByValue()               
           {}
@@ -552,7 +553,7 @@ type UnsafeMessagingServiceServer interface {
 }
 
 func RegisterMessagingServiceServer(s grpc.ServiceRegistrar, srv 
MessagingServiceServer) {
-       // If the following call pancis, it indicates 
UnimplementedMessagingServiceServer was
+       // If the following call panics, it indicates 
UnimplementedMessagingServiceServer was
        // embedded by pointer and is nil.  This will cause panics if an
        // unimplemented method is ever invoked, so we test this at 
initialization
        // time to prevent it from happening at runtime later due to I/O.
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 21d63074..237ef4b5 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -105,6 +105,14 @@ func (pc *defaultPushConsumer) 
changeInvisibleDuration0(context context.Context,
                InvisibleDuration: durationpb.New(invisibleDuration),
                MessageId:         messageView.GetMessageId(),
        }
+
+       // Set LiteTopic only for lite consumer
+       if messageView.GetLiteTopic() != "" && pc.pcSettings.GetClientType() == 
v2.ClientType_LITE_PUSH_CONSUMER {
+               request.LiteTopic = &messageView.liteTopic
+               suspend := true
+               request.Suspend = &suspend
+       }
+
        watchTime := time.Now()
        resp, err := pc.cli.clientManager.ChangeInvisibleDuration(ctx, 
endpoints, request, pc.pcSettings.requestTimeout)
        duration := time.Since(watchTime)
@@ -173,35 +181,23 @@ func (pc *defaultPushConsumer) 
wrapReceiveMessageRequest(batchSize int, messageQ
 }
 
 func (pc *defaultPushConsumer) wrapAckMessageRequest(messageView *MessageView) 
*v2.AckMessageRequest {
+       entry := &v2.AckMessageEntry{
+               MessageId:     messageView.GetMessageId(),
+               ReceiptHandle: messageView.GetReceiptHandle(),
+       }
+
+       // Set LiteTopic only for lite consumer
        if messageView.GetLiteTopic() != "" && pc.pcSettings.GetClientType() == 
v2.ClientType_LITE_PUSH_CONSUMER {
-               return &v2.AckMessageRequest{
-                       Group: pc.pcSettings.groupName,
-                       Topic: &v2.Resource{
-                               Name:              messageView.GetTopic(),
-                               ResourceNamespace: pc.cli.config.NameSpace,
-                       },
-                       Entries: []*v2.AckMessageEntry{
-                               {
-                                       MessageId:     
messageView.GetMessageId(),
-                                       ReceiptHandle: 
messageView.GetReceiptHandle(),
-                                       LiteTopic:     &messageView.liteTopic,
-                               },
-                       },
-               }
-       } else {
-               return &v2.AckMessageRequest{
-                       Group: pc.pcSettings.groupName,
-                       Topic: &v2.Resource{
-                               Name:              messageView.GetTopic(),
-                               ResourceNamespace: pc.cli.config.NameSpace,
-                       },
-                       Entries: []*v2.AckMessageEntry{
-                               {
-                                       MessageId:     
messageView.GetMessageId(),
-                                       ReceiptHandle: 
messageView.GetReceiptHandle(),
-                               },
-                       },
-               }
+               entry.LiteTopic = &messageView.liteTopic
+       }
+
+       return &v2.AckMessageRequest{
+               Group: pc.pcSettings.groupName,
+               Topic: &v2.Resource{
+                       Name:              messageView.GetTopic(),
+                       ResourceNamespace: pc.cli.config.NameSpace,
+               },
+               Entries: []*v2.AckMessageEntry{entry},
        }
 }
 
@@ -378,8 +374,13 @@ func (pc *defaultPushConsumer) Start() error {
 
        threadPool := NewSimpleThreadPool("MessageConsumption", 
int(pc.pcOpts.maxCacheMessageCount), int(pc.pcOpts.consumptionThreadCount))
        if pc.pcSettings.isFifo {
-               pc.consumerService = NewFiFoConsumeService(pc.cli.clientID, 
pc.pcOpts.messageListener, threadPool, pc.cli, 
pc.pcOpts.enableFifoConsumeAccelerator)
-               pc.cli.log.Infof("Create FIFO consume service, 
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t", 
pc.cli.config.ConsumerGroup, pc.cli.clientID, 
pc.pcOpts.enableFifoConsumeAccelerator)
+               if pc.pcSettings.GetClientType() == 
v2.ClientType_LITE_PUSH_CONSUMER {
+                       pc.consumerService = 
NewLiteFifoConsumeService(pc.cli.clientID, pc.pcOpts.messageListener, 
threadPool, pc.cli, pc.pcOpts.enableFifoConsumeAccelerator)
+                       pc.cli.log.Infof("Create Lite FIFO consume service, 
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t", 
pc.cli.config.ConsumerGroup, pc.cli.clientID, 
pc.pcOpts.enableFifoConsumeAccelerator)
+               } else {
+                       pc.consumerService = 
NewFiFoConsumeService(pc.cli.clientID, pc.pcOpts.messageListener, threadPool, 
pc.cli, pc.pcOpts.enableFifoConsumeAccelerator)
+                       pc.cli.log.Infof("Create FIFO consume service, 
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t", 
pc.cli.config.ConsumerGroup, pc.cli.clientID, 
pc.pcOpts.enableFifoConsumeAccelerator)
+               }
        } else {
                pc.consumerService = NewStandardConsumeService(pc.cli.clientID, 
pc.pcOpts.messageListener, threadPool, pc.cli)
                pc.cli.log.Infof("Create standard consume service, 
consumerGroup=%s, clientId=%s", pc.cli.config.ConsumerGroup, pc.cli.clientID)
@@ -601,7 +602,7 @@ func (pc *defaultPushConsumer) ack0(ctx context.Context, 
messageView *MessageVie
 }
 
 func (pc *defaultPushConsumer) 
wrapForwardMessageToDeadLetterQueueRequest(messageView *MessageView) 
*v2.ForwardMessageToDeadLetterQueueRequest {
-       return &v2.ForwardMessageToDeadLetterQueueRequest{
+       request := &v2.ForwardMessageToDeadLetterQueueRequest{
                Group: pc.pcSettings.groupName,
                Topic: &v2.Resource{
                        Name:              messageView.GetTopic(),
@@ -612,6 +613,13 @@ func (pc *defaultPushConsumer) 
wrapForwardMessageToDeadLetterQueueRequest(messag
                DeliveryAttempt:     messageView.GetDeliveryAttempt(),
                MaxDeliveryAttempts: pc.pcSettings.GetRetryPolicy().MaxAttempts,
        }
+
+       // Set LiteTopic only for lite consumer
+       if messageView.GetLiteTopic() != "" && pc.pcSettings.GetClientType() == 
v2.ClientType_LITE_PUSH_CONSUMER {
+               request.LiteTopic = &messageView.liteTopic
+       }
+
+       return request
 }
 
 func (pc *defaultPushConsumer) ForwardMessageToDeadLetterQueue(ctx 
context.Context, messageView *MessageView) error {
diff --git a/golang/push_consumer_options.go b/golang/push_consumer_options.go
index 411fca19..b62572e5 100644
--- a/golang/push_consumer_options.go
+++ b/golang/push_consumer_options.go
@@ -26,19 +26,44 @@ import (
        "google.golang.org/protobuf/types/known/durationpb"
 )
 
-type ConsumerResult int8
+// ConsumerResultType represents the type of consumer result
+type ConsumerResultType int8
 
 const (
-       /**
-        * Consume message successfully.
-        */
-       SUCCESS ConsumerResult = 0
-       /**
-        * Failed to consume message.
-        */
-       FAILURE ConsumerResult = 1
+       // ConsumerResultTypeSuccess represents successful consumption
+       ConsumerResultTypeSuccess ConsumerResultType = 0
+       // ConsumerResultTypeFailure represents failed consumption
+       ConsumerResultTypeFailure ConsumerResultType = 1
+       // ConsumerResultTypeSuspend represents suspended consumption
+       ConsumerResultTypeSuspend ConsumerResultType = 2
 )
 
+// Predefined consumer results for backward compatibility
+var (
+       SUCCESS = ConsumerResult{Type: ConsumerResultTypeSuccess}
+       FAILURE = ConsumerResult{Type: ConsumerResultTypeFailure}
+)
+
+// Minimum suspend time allowed
+const minSuspendTime = 50 * time.Millisecond
+
+// NewConsumerResultSuspend creates a new ConsumerResult with the specified 
suspend time
+func NewConsumerResultSuspend(suspendTime time.Duration) ConsumerResult {
+       if suspendTime < minSuspendTime {
+               panic(fmt.Sprintf("suspend time cannot be less than %v, got 
%v", minSuspendTime, suspendTime))
+       }
+       return ConsumerResult{
+               Type:        ConsumerResultTypeSuspend,
+               suspendTime: suspendTime,
+       }
+}
+
+// ConsumerResult represents the result of message consumption
+type ConsumerResult struct {
+       Type        ConsumerResultType
+       suspendTime time.Duration
+}
+
 type MessageListener interface {
        consume(*MessageView) ConsumerResult
 }
@@ -55,23 +80,23 @@ func (l *FuncMessageListener) consume(msg *MessageView) 
ConsumerResult {
 var _ = MessageListener(&FuncMessageListener{})
 
 type pushConsumerOptions struct {
-       subscriptionExpressions         *sync.Map
-       awaitDuration                   time.Duration
-       maxCacheMessageCount            int32
-       maxCacheMessageSizeInBytes      int64
-       consumptionThreadCount          int32
-       messageListener                 MessageListener
-       clientFunc                      NewClientFunc
-       enableFifoConsumeAccelerator    bool
+       subscriptionExpressions      *sync.Map
+       awaitDuration                time.Duration
+       maxCacheMessageCount         int32
+       maxCacheMessageSizeInBytes   int64
+       consumptionThreadCount       int32
+       messageListener              MessageListener
+       clientFunc                   NewClientFunc
+       enableFifoConsumeAccelerator bool
 }
 
 var defaultPushConsumerOptions = pushConsumerOptions{
-       clientFunc:                    NewClient,
-       awaitDuration:                 0,
-       maxCacheMessageCount:          1024,
-       maxCacheMessageSizeInBytes:    64 * 1024 * 1024,
-       consumptionThreadCount:        20,
-       enableFifoConsumeAccelerator:   false,
+       clientFunc:                   NewClient,
+       awaitDuration:                0,
+       maxCacheMessageCount:         1024,
+       maxCacheMessageSizeInBytes:   64 * 1024 * 1024,
+       consumptionThreadCount:       20,
+       enableFifoConsumeAccelerator: false,
 }
 
 // A ConsumerOption sets options such as tag, etc.
diff --git a/golang/push_consumer_test.go b/golang/push_consumer_test.go
index 45b0ddbd..85d43037 100644
--- a/golang/push_consumer_test.go
+++ b/golang/push_consumer_test.go
@@ -233,3 +233,67 @@ func TestDefaultLitePushConsumer_Wraps(t *testing.T) {
                t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v", 
hb.GetClientType())
        }
 }
+
+// TestNewConsumerResultSuspend tests the NewConsumerResultSuspend function
+func TestNewConsumerResultSuspend(t *testing.T) {
+       tests := []struct {
+               name        string
+               suspendTime time.Duration
+               wantType    ConsumerResultType
+               wantPanic   bool
+       }{
+               {
+                       name:        "valid suspend time - exactly 
minSuspendTime",
+                       suspendTime: minSuspendTime,
+                       wantType:    ConsumerResultTypeSuspend,
+                       wantPanic:   false,
+               },
+               {
+                       name:        "valid suspend time - 1 second",
+                       suspendTime: 1 * time.Second,
+                       wantType:    ConsumerResultTypeSuspend,
+                       wantPanic:   false,
+               },
+               {
+                       name:        "invalid suspend time - less than 
minSuspendTime",
+                       suspendTime: 10 * time.Millisecond,
+                       wantType:    ConsumerResultTypeSuspend,
+                       wantPanic:   true,
+               },
+               {
+                       name:        "invalid suspend time - negative",
+                       suspendTime: -1 * time.Millisecond,
+                       wantType:    ConsumerResultTypeSuspend,
+                       wantPanic:   true,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if tt.wantPanic {
+                               defer func() {
+                                       if r := recover(); r == nil {
+                                               
t.Errorf("NewConsumerResultSuspend() should have panicked for suspendTime=%v", 
tt.suspendTime)
+                                       }
+                               }()
+                               NewConsumerResultSuspend(tt.suspendTime)
+                       } else {
+                               result := 
NewConsumerResultSuspend(tt.suspendTime)
+                               if result.Type != tt.wantType {
+                                       t.Errorf("NewConsumerResultSuspend() 
Type = %v, want %v", result.Type, tt.wantType)
+                               }
+                       }
+               })
+       }
+}
+
+// TestPredefinedConsumerResults tests the predefined SUCCESS and FAILURE 
constants
+func TestPredefinedConsumerResults(t *testing.T) {
+       if SUCCESS.Type != ConsumerResultTypeSuccess {
+               t.Errorf("SUCCESS.Type = %v, want %v", SUCCESS.Type, 
ConsumerResultTypeSuccess)
+       }
+
+       if FAILURE.Type != ConsumerResultTypeFailure {
+               t.Errorf("FAILURE.Type = %v, want %v", FAILURE.Type, 
ConsumerResultTypeFailure)
+       }
+}

Reply via email to