This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a96e5900 refactor(lite_push_consumer): enable Lite Push Consumer to
suspend consumption (#1203)
a96e5900 is described below
commit a96e59003f6c93be9d301f5a4eb36e2ab4caed25
Author: Quan <[email protected]>
AuthorDate: Fri Mar 20 15:48:27 2026 +0800
refactor(lite_push_consumer): enable Lite Push Consumer to suspend
consumption (#1203)
Change-Id: Ia190bd12af80ded00a20cfe83712918d0f162dff
---
golang/consumer_service.go | 144 +++++++++++--
golang/lite_push_consumer.go | 23 ++-
golang/process_queue.go | 45 ++++-
golang/protocol/v2/admin.pb.go | 9 +-
golang/protocol/v2/admin_grpc.pb.go | 9 +-
golang/protocol/v2/definition.pb.go | 368 ++++++++++++++++++++++++++--------
golang/protocol/v2/service.pb.go | 143 ++++++++-----
golang/protocol/v2/service_grpc.pb.go | 41 ++--
golang/push_consumer.go | 70 ++++---
golang/push_consumer_options.go | 71 ++++---
golang/push_consumer_test.go | 64 ++++++
11 files changed, 723 insertions(+), 264 deletions(-)
diff --git a/golang/consumer_service.go b/golang/consumer_service.go
index 316749fb..7aad7465 100644
--- a/golang/consumer_service.go
+++ b/golang/consumer_service.go
@@ -22,6 +22,20 @@ import (
"time"
)
+var (
+ messageGroupExtractor = func(mv *MessageView) string {
+ messageGroup := mv.GetMessageGroup()
+ if messageGroup != nil {
+ return *messageGroup
+ }
+ return ""
+ }
+
+ liteTopicExtractor = func(mv *MessageView) string {
+ return mv.GetLiteTopic()
+ }
+)
+
type ConsumeService interface {
consume(ProcessQueue, []*MessageView)
consumeWithDuration(*MessageView, time.Duration, func(ConsumerResult,
error))
@@ -93,7 +107,8 @@ func (bcs *baseConsumeService) newConsumeTask(clientId
string, messageListener M
}()
duration := time.Since(startTime)
status := MessageHookPointsStatus_ERROR
- if consumeResult == SUCCESS {
+ // Check if result is SUCCESS or SUSPEND (considered as
successful)
+ if consumeResult.Type == ConsumerResultTypeSuccess ||
consumeResult.Type == ConsumerResultTypeSuspend {
status = MessageHookPointsStatus_OK
}
messageInterceptor.doAfter(MessageHookPoints_CONSUME,
[]*MessageCommon{messageView.GetMessageCommon()}, duration, status)
@@ -101,6 +116,7 @@ func (bcs *baseConsumeService) newConsumeTask(clientId
string, messageListener M
}
var _ = ConsumeService(&standardConsumeService{})
+var _ = ConsumeService(&liteFifoConsumeService{})
type standardConsumeService struct {
baseConsumeService
@@ -133,37 +149,50 @@ func NewStandardConsumeService(clientId string,
messageListener MessageListener,
}
}
-func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews
[]*MessageView) {
- if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
- fcs.consumeIteratively(pq, &messageViews, 0)
- return
- }
- // Group messages by messageGroup
- messageViewsGroupByMessageGroup := make(map[string][]*MessageView)
- messageViewsWithoutMessageGroup := make([]*MessageView, 0)
+// groupMessageBy groups messages by applying the provided groupKeyExtractor
function
+// It returns two maps: grouped messages and messages without a group key
+func groupMessageBy(messageViews []*MessageView, groupKeyExtractor
func(*MessageView) string) (map[string][]*MessageView, []*MessageView) {
+ messageViewsGroupByGroupKey := make(map[string][]*MessageView)
+ messageViewsWithoutGroupKey := make([]*MessageView, 0)
+
for _, messageView := range messageViews {
- messageGroup := messageView.GetMessageGroup()
- if messageGroup != nil && *messageGroup != "" {
- messageViewsGroupByMessageGroup[*messageGroup] =
append(messageViewsGroupByMessageGroup[*messageGroup], messageView)
+ groupKey := groupKeyExtractor(messageView)
+ if groupKey != "" {
+ messageViewsGroupByGroupKey[groupKey] =
append(messageViewsGroupByGroupKey[groupKey], messageView)
} else {
- messageViewsWithoutMessageGroup =
append(messageViewsWithoutMessageGroup, messageView)
+ messageViewsWithoutGroupKey =
append(messageViewsWithoutGroupKey, messageView)
}
}
- groupNum := len(messageViewsGroupByMessageGroup)
- if len(messageViewsWithoutMessageGroup) > 0 {
+ groupNum := len(messageViewsGroupByGroupKey)
+ if len(messageViewsWithoutGroupKey) > 0 {
groupNum++
}
sugarBaseLogger.Debugf("FifoConsumeService parallel consume,
messageViewsNum=%d, groupNum=%d", len(messageViews), groupNum)
+ return messageViewsGroupByGroupKey, messageViewsWithoutGroupKey
+}
+
+func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews
[]*MessageView) {
+ if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
+ fcs.consumeIteratively(pq, &messageViews, 0)
+ return
+ }
+
+ messageViewsGroupByGroupKey, messageViewsWithoutGroupKey :=
groupMessageBy(
+ messageViews,
+ messageGroupExtractor,
+ )
+
// Consume messages in parallel by group
- for _, group := range messageViewsGroupByMessageGroup {
+ for _, group := range messageViewsGroupByGroupKey {
fcs.consumeIteratively(pq, &group, 0)
}
- if len(messageViewsWithoutMessageGroup) > 0 {
- fcs.consumeIteratively(pq, &messageViewsWithoutMessageGroup, 0)
+ if len(messageViewsWithoutGroupKey) > 0 {
+ fcs.consumeIteratively(pq, &messageViewsWithoutGroupKey, 0)
}
}
+
func (fcs *fifoConsumeService) consumeIteratively(pq ProcessQueue,
messageViewsPtr *[]*MessageView, ptr int) {
if messageViewsPtr == nil {
sugarBaseLogger.Errorf("[Bug] messageViews is nil when
consumeIteratively")
@@ -192,7 +221,82 @@ func (fcs *fifoConsumeService) consumeIteratively(pq
ProcessQueue, messageViewsP
func NewFiFoConsumeService(clientId string, messageListener MessageListener,
consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor,
enableFifoConsumeAccelerator bool) *fifoConsumeService {
return &fifoConsumeService{
- baseConsumeService: *NewBaseConsumeService(clientId,
messageListener, consumptionExecutor, messageInterceptor),
- enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
+ baseConsumeService: *NewBaseConsumeService(clientId,
messageListener, consumptionExecutor, messageInterceptor),
+ enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
+ }
+}
+
+// liteFifoConsumeService is a fifoConsumeService that used for lite push
consumer
+type liteFifoConsumeService struct {
+ baseConsumeService
+ enableFifoConsumeAccelerator bool
+}
+
+func (lcs *liteFifoConsumeService) consume(pq ProcessQueue, messageViews
[]*MessageView) {
+ if !lcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
+ lcs.consumeIteratively(pq, &messageViews, 0)
+ return
+ }
+
+ messageViewsGroupByGroupKey, messageViewsWithoutGroupKey :=
groupMessageBy(
+ messageViews,
+ liteTopicExtractor,
+ )
+
+ for _, group := range messageViewsGroupByGroupKey {
+ lcs.consumeIteratively(pq, &group, 0)
+ }
+ if len(messageViewsWithoutGroupKey) > 0 {
+ lcs.consumeIteratively(pq, &messageViewsWithoutGroupKey, 0)
+ }
+}
+
+func (lcs *liteFifoConsumeService) consumeIteratively(pq ProcessQueue,
messageViewsPtr *[]*MessageView, ptr int) {
+ if messageViewsPtr == nil {
+ sugarBaseLogger.Errorf("[Bug] messageViews is nil when
consumeIteratively")
+ return
+ }
+ messageViews := *messageViewsPtr
+ if ptr >= len(messageViews) {
+ return
+ }
+ mv := messageViews[ptr]
+ if mv.isCorrupted() {
+ sugarBaseLogger.Errorf("Message is corrupted for FIFO
consumption, prepare to discard it, mq=%s, messageId=%s, clientId=%s",
pq.getMessageQueue().String(), mv.GetMessageId(), lcs.clientId)
+ pq.discardFifoMessage(mv)
+ lcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
+ return
+ }
+ lcs.consumeImmediately(mv, func(result ConsumerResult, err error) {
+ if err != nil {
+ sugarBaseLogger.Errorf("[Bug] Exception raised in
consumption callback, clientId=%s", lcs.clientId)
+ return
+ }
+ pq.eraseFifoMessage(mv, result)
+ if result.Type == ConsumerResultTypeSuspend {
+ // Suspend all messages with the same liteTopic in this
batch
+ newMsgList := make([]*MessageView, 0)
+ for i := ptr + 1; i < len(messageViews); i++ {
+ msgView := messageViews[i]
+ if msgView.GetLiteTopic() == mv.GetLiteTopic() {
+ pq.eraseFifoMessage(msgView, result)
+ } else {
+ newMsgList = append(newMsgList, msgView)
+ }
+ }
+ // Continue processing remaining messages with
different liteTopic
+ if len(newMsgList) > 0 {
+ lcs.consumeIteratively(pq, &newMsgList, 0)
+ }
+ } else {
+ lcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
+ }
+ })
+}
+
+func NewLiteFifoConsumeService(clientId string, messageListener
MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor
MessageInterceptor, enableFifoConsumeAccelerator bool) *liteFifoConsumeService {
+ return &liteFifoConsumeService{
+ baseConsumeService: *NewBaseConsumeService(clientId,
messageListener, consumptionExecutor, messageInterceptor),
+ enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
}
}
diff --git a/golang/lite_push_consumer.go b/golang/lite_push_consumer.go
index 3fa620fe..e78cd1b4 100644
--- a/golang/lite_push_consumer.go
+++ b/golang/lite_push_consumer.go
@@ -107,27 +107,27 @@ func (lpc *defaultLitePushConsumer)
notifyUnsubscribeLite(command *v2.NotifyUnsu
lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
}
-func (lpc *defaultLitePushConsumer) SubscribeLite(topic string) error {
+func (lpc *defaultLitePushConsumer) SubscribeLite(liteTopic string) error {
if err := lpc.checkRunning(); err != nil {
return err
}
- if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{topic}); err != nil {
- sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite topic:%s
err:%v", topic, err)
+ if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{liteTopic}); err != nil {
+ sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite
liteTopic:%s err:%v", liteTopic, err)
return err
}
- lpc.litePushConsumerSettings.liteTopicSet.Store(topic, struct{}{})
+ lpc.litePushConsumerSettings.liteTopicSet.Store(liteTopic, struct{}{})
return nil
}
-func (lpc *defaultLitePushConsumer) UnSubscribeLite(topic string) error {
+func (lpc *defaultLitePushConsumer) UnSubscribeLite(liteTopic string) error {
if err := lpc.checkRunning(); err != nil {
return err
}
- if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{topic}); err != nil {
- sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite
topic:%s err:%v", topic, err)
+ if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{liteTopic}); err != nil {
+ sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite
liteTopic:%s err:%v", liteTopic, err)
return err
}
- lpc.litePushConsumerSettings.liteTopicSet.Delete(topic)
+ lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
return nil
}
@@ -147,9 +147,10 @@ func (lpc *defaultLitePushConsumer)
syncAllLiteSubscription() {
}
return true
})
- if len(liteTopicSet) == 0 {
- return
- }
+ // Sync subscription even when liteTopicSet is empty to keep server
state consistent
+ //if len(liteTopicSet) == 0 {
+ // return
+ //}
if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_COMPLETE_ADD, liteTopicSet); err != nil {
sugarBaseLogger.Errorf("LitePushConsumer
syncAllLiteSubscription:%v, err:%v", liteTopicSet, err)
}
diff --git a/golang/process_queue.go b/golang/process_queue.go
index 274af8e3..63e709b2 100644
--- a/golang/process_queue.go
+++ b/golang/process_queue.go
@@ -71,6 +71,8 @@ func (dpq *defaultProcessQueue) discardFifoMessage(mv
*MessageView) {
}
func (dpq *defaultProcessQueue) eraseFifoMessage(mv *MessageView, result
ConsumerResult) {
+ result = dpq.convertSuspendResultIfNeeded(result)
+
retryPolicy := dpq.consumer.pcSettings.GetRetryPolicy()
maxAttempts := retryPolicy.MaxAttempts
attempt := mv.GetMessageCommon().deliveryAttempt
@@ -78,7 +80,8 @@ func (dpq *defaultProcessQueue) eraseFifoMessage(mv
*MessageView, result Consume
service := dpq.consumer.consumerService
clientId := dpq.consumer.cli.clientID
- if result == FAILURE && attempt < maxAttempts {
+ // Handle FAILURE result with retry
+ if result.Type == ConsumerResultTypeFailure && attempt < maxAttempts {
nextAttemptDelay := utils.GetNextAttemptDelay(retryPolicy,
int(attempt))
mv.deliveryAttempt += 1
attempt = mv.deliveryAttempt
@@ -91,16 +94,36 @@ func (dpq *defaultProcessQueue) eraseFifoMessage(mv
*MessageView, result Consume
return
}
- if result != SUCCESS {
- dpq.consumer.cli.log.Infof("Failed to consume fifo message
finally, run out of attempt times, maxAttempts=%d, "+
- "attempt=%d, mq=%s, messageId=%s, clientId=%s",
maxAttempts, attempt, dpq.mqstr, messageId, clientId)
- }
- // Ack message or forward it to DLQ depends on consumption result.
- if result == SUCCESS {
+ // Handle SUCCESS result
+ if result.Type == ConsumerResultTypeSuccess {
dpq.ackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
- } else {
- dpq.forwardToDeadLetterQueue(mv, func(error) {
dpq.evictCacheMessage(mv) })
+ return
+ }
+
+ // Handle SUSPEND result
+ if result.Type == ConsumerResultTypeSuspend {
+ dpq.consumer.cli.log.Infof("Suspend consumption,
consumerGroup=%s, topic=%s, liteTopic=%s, messageId=%s, suspendTime=%v",
+ dpq.consumer.groupName, mv.topic, mv.GetLiteTopic(),
messageId, result.suspendTime)
+ dpq.changeInvisibleDuration(mv, result.suspendTime, 1,
func(error) { dpq.evictCacheMessage(mv) })
+ return
}
+
+ // Handle FAILURE result without retry (final failure)
+ dpq.consumer.cli.log.Infof("Failed to consume fifo message finally, run
out of attempt times, maxAttempts=%d, "+
+ "attempt=%d, mq=%s, messageId=%s, clientId=%s", maxAttempts,
attempt, dpq.mqstr, messageId, clientId)
+ dpq.forwardToDeadLetterQueue(mv, func(error) {
dpq.evictCacheMessage(mv) })
+}
+
+func (dpq *defaultProcessQueue) convertSuspendResultIfNeeded(result
ConsumerResult) ConsumerResult {
+ if result.Type == ConsumerResultTypeSuspend {
+ if dpq.consumer.pcSettings.clientType !=
v2.ClientType_LITE_PUSH_CONSUMER {
+ dpq.consumer.cli.log.Warnf("Only LitePushConsumer
supports ConsumeResultSuspend! "+
+ "Convert to FAILURE, consumerGroup=%s,
consumerType=%v",
+ dpq.consumer.groupName,
dpq.consumer.pcSettings.clientType)
+ return FAILURE
+ }
+ }
+ return result
}
func (dpq *defaultProcessQueue) forwardToDeadLetterQueue(mv *MessageView,
callback func(error)) {
@@ -162,14 +185,14 @@ func (dpq *defaultProcessQueue)
forwardToDeadLetterQueueLater(mv *MessageView, a
}
func (dpq *defaultProcessQueue) eraseMessage(mv *MessageView, consumeResult
ConsumerResult) {
- if consumeResult == SUCCESS {
+ consumeResult = dpq.convertSuspendResultIfNeeded(consumeResult)
+ if consumeResult.Type == ConsumerResultTypeSuccess ||
consumeResult.Type == ConsumerResultTypeSuspend {
dpq.consumer.consumptionOkQuantity.Inc()
dpq.ackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
} else {
dpq.consumer.consumptionErrorQuantity.Inc()
dpq.nackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
}
-
}
func (dpq *defaultProcessQueue) discardMessage(mv *MessageView) {
diff --git a/golang/protocol/v2/admin.pb.go b/golang/protocol/v2/admin.pb.go
index ac8d6d98..0a2c2610 100644
--- a/golang/protocol/v2/admin.pb.go
+++ b/golang/protocol/v2/admin.pb.go
@@ -15,18 +15,19 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.36.8
-// protoc v5.29.3
+// protoc-gen-go v1.36.11
+// protoc v7.34.0
// source: apache/rocketmq/v2/admin.proto
package v2
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
unsafe "unsafe"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/golang/protocol/v2/admin_grpc.pb.go
b/golang/protocol/v2/admin_grpc.pb.go
index 0ea7cc2f..4d2ba3a5 100644
--- a/golang/protocol/v2/admin_grpc.pb.go
+++ b/golang/protocol/v2/admin_grpc.pb.go
@@ -15,14 +15,15 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
-// - protoc-gen-go-grpc v1.5.1
-// - protoc v5.29.3
+// - protoc-gen-go-grpc v1.6.1
+// - protoc v7.34.0
// source: apache/rocketmq/v2/admin.proto
package v2
import (
context "context"
+
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@@ -78,7 +79,7 @@ type AdminServer interface {
type UnimplementedAdminServer struct{}
func (UnimplementedAdminServer) ChangeLogLevel(context.Context,
*ChangeLogLevelRequest) (*ChangeLogLevelResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method ChangeLogLevel
not implemented")
+ return nil, status.Error(codes.Unimplemented, "method ChangeLogLevel
not implemented")
}
func (UnimplementedAdminServer) mustEmbedUnimplementedAdminServer() {}
func (UnimplementedAdminServer) testEmbeddedByValue() {}
@@ -91,7 +92,7 @@ type UnsafeAdminServer interface {
}
func RegisterAdminServer(s grpc.ServiceRegistrar, srv AdminServer) {
- // If the following call pancis, it indicates UnimplementedAdminServer
was
+ // If the following call panics, it indicates UnimplementedAdminServer
was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at
initialization
// time to prevent it from happening at runtime later due to I/O.
diff --git a/golang/protocol/v2/definition.pb.go
b/golang/protocol/v2/definition.pb.go
index 37977be8..4460b6cf 100644
--- a/golang/protocol/v2/definition.pb.go
+++ b/golang/protocol/v2/definition.pb.go
@@ -15,20 +15,21 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.36.8
-// protoc v5.29.3
+// protoc-gen-go v1.36.11
+// protoc v7.34.0
// source: apache/rocketmq/v2/definition.proto
package v2
import (
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
durationpb "google.golang.org/protobuf/types/known/durationpb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
- reflect "reflect"
- sync "sync"
- unsafe "unsafe"
)
const (
@@ -306,6 +307,8 @@ const (
MessageType_TRANSACTION MessageType = 4
// lite topic
MessageType_LITE MessageType = 5
+ // Messages that lower prioritised ones may need to wait for higher
priority messages to be processed first
+ MessageType_PRIORITY MessageType = 6
)
// Enum value maps for MessageType.
@@ -317,6 +320,7 @@ var (
3: "DELAY",
4: "TRANSACTION",
5: "LITE",
+ 6: "PRIORITY",
}
MessageType_value = map[string]int32{
"MESSAGE_TYPE_UNSPECIFIED": 0,
@@ -325,6 +329,7 @@ var (
"DELAY": 3,
"TRANSACTION": 4,
"LITE": 5,
+ "PRIORITY": 6,
}
)
@@ -873,13 +878,9 @@ func (Language) EnumDescriptor() ([]byte, []int) {
type LiteSubscriptionAction int32
const (
- // incremental add
- LiteSubscriptionAction_PARTIAL_ADD LiteSubscriptionAction = 0
- // incremental remove
- LiteSubscriptionAction_PARTIAL_REMOVE LiteSubscriptionAction = 1
- // all add
- LiteSubscriptionAction_COMPLETE_ADD LiteSubscriptionAction = 2
- // add remove
+ LiteSubscriptionAction_PARTIAL_ADD LiteSubscriptionAction = 0
+ LiteSubscriptionAction_PARTIAL_REMOVE LiteSubscriptionAction = 1
+ LiteSubscriptionAction_COMPLETE_ADD LiteSubscriptionAction = 2
LiteSubscriptionAction_COMPLETE_REMOVE LiteSubscriptionAction = 3
)
@@ -978,6 +979,55 @@ func (QueryOffsetPolicy) EnumDescriptor() ([]byte, []int) {
return file_apache_rocketmq_v2_definition_proto_rawDescGZIP(), []int{12}
}
+type OffsetOption_Policy int32
+
+const (
+ OffsetOption_LAST OffsetOption_Policy = 0
+ OffsetOption_MIN OffsetOption_Policy = 1
+ OffsetOption_MAX OffsetOption_Policy = 2
+)
+
+// Enum value maps for OffsetOption_Policy.
+var (
+ OffsetOption_Policy_name = map[int32]string{
+ 0: "LAST",
+ 1: "MIN",
+ 2: "MAX",
+ }
+ OffsetOption_Policy_value = map[string]int32{
+ "LAST": 0,
+ "MIN": 1,
+ "MAX": 2,
+ }
+)
+
+func (x OffsetOption_Policy) Enum() *OffsetOption_Policy {
+ p := new(OffsetOption_Policy)
+ *p = x
+ return p
+}
+
+func (x OffsetOption_Policy) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(),
protoreflect.EnumNumber(x))
+}
+
+func (OffsetOption_Policy) Descriptor() protoreflect.EnumDescriptor {
+ return
file_apache_rocketmq_v2_definition_proto_enumTypes[13].Descriptor()
+}
+
+func (OffsetOption_Policy) Type() protoreflect.EnumType {
+ return &file_apache_rocketmq_v2_definition_proto_enumTypes[13]
+}
+
+func (x OffsetOption_Policy) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use OffsetOption_Policy.Descriptor instead.
+func (OffsetOption_Policy) EnumDescriptor() ([]byte, []int) {
+ return file_apache_rocketmq_v2_definition_proto_rawDescGZIP(),
[]int{21, 0}
+}
+
type FilterExpression struct {
state protoimpl.MessageState `protogen:"open.v1"`
Type FilterType
`protobuf:"varint,1,opt,name=type,proto3,enum=apache.rocketmq.v2.FilterType"
json:"type,omitempty"`
@@ -1700,7 +1750,9 @@ type SystemProperties struct {
// Information to identify whether this message is from dead letter
queue.
DeadLetterQueue *DeadLetterQueue
`protobuf:"bytes,20,opt,name=dead_letter_queue,json=deadLetterQueue,proto3,oneof"
json:"dead_letter_queue,omitempty"`
// lite topic
- LiteTopic *string
`protobuf:"bytes,21,opt,name=lite_topic,json=liteTopic,proto3,oneof"
json:"lite_topic,omitempty"`
+ LiteTopic *string
`protobuf:"bytes,21,opt,name=lite_topic,json=liteTopic,proto3,oneof"
json:"lite_topic,omitempty"`
+ // Priority of message, which is optional
+ Priority *int32
`protobuf:"varint,22,opt,name=priority,proto3,oneof" json:"priority,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1882,6 +1934,13 @@ func (x *SystemProperties) GetLiteTopic() string {
return ""
}
+func (x *SystemProperties) GetPriority() int32 {
+ if x != nil && x.Priority != nil {
+ return *x.Priority
+ }
+ return 0
+}
+
type DeadLetterQueue struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Original topic for this DLQ message.
@@ -2556,6 +2615,120 @@ func (x *Metric) GetEndpoints() *Endpoints {
return nil
}
+type OffsetOption struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ // Types that are valid to be assigned to OffsetType:
+ //
+ // *OffsetOption_Policy_
+ // *OffsetOption_Offset
+ // *OffsetOption_TailN
+ // *OffsetOption_Timestamp
+ OffsetType isOffsetOption_OffsetType `protobuf_oneof:"offset_type"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *OffsetOption) Reset() {
+ *x = OffsetOption{}
+ mi := &file_apache_rocketmq_v2_definition_proto_msgTypes[21]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *OffsetOption) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*OffsetOption) ProtoMessage() {}
+
+func (x *OffsetOption) ProtoReflect() protoreflect.Message {
+ mi := &file_apache_rocketmq_v2_definition_proto_msgTypes[21]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use OffsetOption.ProtoReflect.Descriptor instead.
+func (*OffsetOption) Descriptor() ([]byte, []int) {
+ return file_apache_rocketmq_v2_definition_proto_rawDescGZIP(), []int{21}
+}
+
+func (x *OffsetOption) GetOffsetType() isOffsetOption_OffsetType {
+ if x != nil {
+ return x.OffsetType
+ }
+ return nil
+}
+
+func (x *OffsetOption) GetPolicy() OffsetOption_Policy {
+ if x != nil {
+ if x, ok := x.OffsetType.(*OffsetOption_Policy_); ok {
+ return x.Policy
+ }
+ }
+ return OffsetOption_LAST
+}
+
+func (x *OffsetOption) GetOffset() int64 {
+ if x != nil {
+ if x, ok := x.OffsetType.(*OffsetOption_Offset); ok {
+ return x.Offset
+ }
+ }
+ return 0
+}
+
+func (x *OffsetOption) GetTailN() int64 {
+ if x != nil {
+ if x, ok := x.OffsetType.(*OffsetOption_TailN); ok {
+ return x.TailN
+ }
+ }
+ return 0
+}
+
+func (x *OffsetOption) GetTimestamp() int64 {
+ if x != nil {
+ if x, ok := x.OffsetType.(*OffsetOption_Timestamp); ok {
+ return x.Timestamp
+ }
+ }
+ return 0
+}
+
+type isOffsetOption_OffsetType interface {
+ isOffsetOption_OffsetType()
+}
+
+type OffsetOption_Policy_ struct {
+ Policy OffsetOption_Policy
`protobuf:"varint,1,opt,name=policy,proto3,enum=apache.rocketmq.v2.OffsetOption_Policy,oneof"`
+}
+
+type OffsetOption_Offset struct {
+ Offset int64 `protobuf:"varint,2,opt,name=offset,proto3,oneof"`
+}
+
+type OffsetOption_TailN struct {
+ TailN int64
`protobuf:"varint,3,opt,name=tail_n,json=tailN,proto3,oneof"`
+}
+
+type OffsetOption_Timestamp struct {
+ Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3,oneof"`
+}
+
+func (*OffsetOption_Policy_) isOffsetOption_OffsetType() {}
+
+func (*OffsetOption_Offset) isOffsetOption_OffsetType() {}
+
+func (*OffsetOption_TailN) isOffsetOption_OffsetType() {}
+
+func (*OffsetOption_Timestamp) isOffsetOption_OffsetType() {}
+
var File_apache_rocketmq_v2_definition_proto protoreflect.FileDescriptor
const file_apache_rocketmq_v2_definition_proto_rawDesc = "" +
@@ -2608,8 +2781,7 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc =
"" +
"\x14accept_message_types\x18\x05
\x03(\x0e2\x1f.apache.rocketmq.v2.MessageTypeR\x12acceptMessageTypes\"X\n" +
"\x06Digest\x122\n" +
"\x04type\x18\x01
\x01(\x0e2\x1e.apache.rocketmq.v2.DigestTypeR\x04type\x12\x1a\n" +
- "\bchecksum\x18\x02 \x01(\tR\bchecksum\"\xe1\n" +
- "\n" +
+ "\bchecksum\x18\x02 \x01(\tR\bchecksum\"\x8f\v\n" +
"\x10SystemProperties\x12\x15\n" +
"\x03tag\x18\x01 \x01(\tH\x00R\x03tag\x88\x01\x01\x12\x12\n" +
"\x04keys\x18\x02 \x03(\tR\x04keys\x12\x1d\n" +
@@ -2637,7 +2809,8 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc =
"" +
"\x11dead_letter_queue\x18\x14
\x01(\v2#.apache.rocketmq.v2.DeadLetterQueueH\n" +
"R\x0fdeadLetterQueue\x88\x01\x01\x12\"\n" +
"\n" +
- "lite_topic\x18\x15 \x01(\tH\vR\tliteTopic\x88\x01\x01B\x06\n" +
+ "lite_topic\x18\x15 \x01(\tH\vR\tliteTopic\x88\x01\x01\x12\x1f\n" +
+ "\bpriority\x18\x16 \x01(\x05H\fR\bpriority\x88\x01\x01B\x06\n" +
"\x04_tagB\x12\n" +
"\x10_store_timestampB\x15\n" +
"\x13_delivery_timestampB\x11\n" +
@@ -2649,7 +2822,8 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc =
"" +
"\x0e_trace_contextB)\n" +
"'_orphaned_transaction_recovery_durationB\x14\n" +
"\x12_dead_letter_queueB\r\n" +
- "\v_lite_topic\"F\n" +
+ "\v_lite_topicB\v\n" +
+ "\t_priority\"F\n" +
"\x0fDeadLetterQueue\x12\x14\n" +
"\x05topic\x18\x01 \x01(\tR\x05topic\x12\x1d\n" +
"\n" +
@@ -2714,7 +2888,17 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc =
"" +
"\x02on\x18\x01 \x01(\bR\x02on\x12@\n" +
"\tendpoints\x18\x02
\x01(\v2\x1d.apache.rocketmq.v2.EndpointsH\x00R\tendpoints\x88\x01\x01B\f\n" +
"\n" +
- "_endpoints*Y\n" +
+ "_endpoints\"\xd9\x01\n" +
+ "\fOffsetOption\x12A\n" +
+ "\x06policy\x18\x01
\x01(\x0e2'.apache.rocketmq.v2.OffsetOption.PolicyH\x00R\x06policy\x12\x18\n" +
+ "\x06offset\x18\x02 \x01(\x03H\x00R\x06offset\x12\x17\n" +
+ "\x06tail_n\x18\x03 \x01(\x03H\x00R\x05tailN\x12\x1e\n" +
+ "\ttimestamp\x18\x04 \x01(\x03H\x00R\ttimestamp\"$\n" +
+ "\x06Policy\x12\b\n" +
+ "\x04LAST\x10\x00\x12\a\n" +
+ "\x03MIN\x10\x01\x12\a\n" +
+ "\x03MAX\x10\x02B\r\n" +
+ "\voffset_type*Y\n" +
"\x15TransactionResolution\x12&\n" +
"\"TRANSACTION_RESOLUTION_UNSPECIFIED\x10\x00\x12\n" +
"\n" +
@@ -2741,7 +2925,7 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc =
"" +
"\x1aADDRESS_SCHEME_UNSPECIFIED\x10\x00\x12\b\n" +
"\x04IPv4\x10\x01\x12\b\n" +
"\x04IPv6\x10\x02\x12\x0f\n" +
- "\vDOMAIN_NAME\x10\x03*g\n" +
+ "\vDOMAIN_NAME\x10\x03*u\n" +
"\vMessageType\x12\x1c\n" +
"\x18MESSAGE_TYPE_UNSPECIFIED\x10\x00\x12\n" +
"\n" +
@@ -2749,7 +2933,8 @@ const file_apache_rocketmq_v2_definition_proto_rawDesc =
"" +
"\x04FIFO\x10\x02\x12\t\n" +
"\x05DELAY\x10\x03\x12\x0f\n" +
"\vTRANSACTION\x10\x04\x12\b\n" +
- "\x04LITE\x10\x05*G\n" +
+ "\x04LITE\x10\x05\x12\f\n" +
+ "\bPRIORITY\x10\x06*G\n" +
"\n" +
"DigestType\x12\x1b\n" +
"\x17DIGEST_TYPE_UNSPECIFIED\x10\x00\x12\t\n" +
@@ -2864,8 +3049,8 @@ func
file_apache_rocketmq_v2_definition_proto_rawDescGZIP() []byte {
return file_apache_rocketmq_v2_definition_proto_rawDescData
}
-var file_apache_rocketmq_v2_definition_proto_enumTypes =
make([]protoimpl.EnumInfo, 13)
-var file_apache_rocketmq_v2_definition_proto_msgTypes =
make([]protoimpl.MessageInfo, 22)
+var file_apache_rocketmq_v2_definition_proto_enumTypes =
make([]protoimpl.EnumInfo, 14)
+var file_apache_rocketmq_v2_definition_proto_msgTypes =
make([]protoimpl.MessageInfo, 23)
var file_apache_rocketmq_v2_definition_proto_goTypes = []any{
(TransactionResolution)(0), // 0:
apache.rocketmq.v2.TransactionResolution
(TransactionSource)(0), // 1:
apache.rocketmq.v2.TransactionSource
@@ -2880,81 +3065,84 @@ var file_apache_rocketmq_v2_definition_proto_goTypes =
[]any{
(Language)(0), // 10: apache.rocketmq.v2.Language
(LiteSubscriptionAction)(0), // 11:
apache.rocketmq.v2.LiteSubscriptionAction
(QueryOffsetPolicy)(0), // 12:
apache.rocketmq.v2.QueryOffsetPolicy
- (*FilterExpression)(nil), // 13:
apache.rocketmq.v2.FilterExpression
- (*RetryPolicy)(nil), // 14: apache.rocketmq.v2.RetryPolicy
- (*ExponentialBackoff)(nil), // 15:
apache.rocketmq.v2.ExponentialBackoff
- (*CustomizedBackoff)(nil), // 16:
apache.rocketmq.v2.CustomizedBackoff
- (*Resource)(nil), // 17: apache.rocketmq.v2.Resource
- (*SubscriptionEntry)(nil), // 18:
apache.rocketmq.v2.SubscriptionEntry
- (*Address)(nil), // 19: apache.rocketmq.v2.Address
- (*Endpoints)(nil), // 20: apache.rocketmq.v2.Endpoints
- (*Broker)(nil), // 21: apache.rocketmq.v2.Broker
- (*MessageQueue)(nil), // 22: apache.rocketmq.v2.MessageQueue
- (*Digest)(nil), // 23: apache.rocketmq.v2.Digest
- (*SystemProperties)(nil), // 24:
apache.rocketmq.v2.SystemProperties
- (*DeadLetterQueue)(nil), // 25: apache.rocketmq.v2.DeadLetterQueue
- (*Message)(nil), // 26: apache.rocketmq.v2.Message
- (*Assignment)(nil), // 27: apache.rocketmq.v2.Assignment
- (*Status)(nil), // 28: apache.rocketmq.v2.Status
- (*UA)(nil), // 29: apache.rocketmq.v2.UA
- (*Settings)(nil), // 30: apache.rocketmq.v2.Settings
- (*Publishing)(nil), // 31: apache.rocketmq.v2.Publishing
- (*Subscription)(nil), // 32: apache.rocketmq.v2.Subscription
- (*Metric)(nil), // 33: apache.rocketmq.v2.Metric
- nil, // 34:
apache.rocketmq.v2.Message.UserPropertiesEntry
- (*durationpb.Duration)(nil), // 35: google.protobuf.Duration
- (*timestamppb.Timestamp)(nil), // 36: google.protobuf.Timestamp
+ (OffsetOption_Policy)(0), // 13:
apache.rocketmq.v2.OffsetOption.Policy
+ (*FilterExpression)(nil), // 14:
apache.rocketmq.v2.FilterExpression
+ (*RetryPolicy)(nil), // 15: apache.rocketmq.v2.RetryPolicy
+ (*ExponentialBackoff)(nil), // 16:
apache.rocketmq.v2.ExponentialBackoff
+ (*CustomizedBackoff)(nil), // 17:
apache.rocketmq.v2.CustomizedBackoff
+ (*Resource)(nil), // 18: apache.rocketmq.v2.Resource
+ (*SubscriptionEntry)(nil), // 19:
apache.rocketmq.v2.SubscriptionEntry
+ (*Address)(nil), // 20: apache.rocketmq.v2.Address
+ (*Endpoints)(nil), // 21: apache.rocketmq.v2.Endpoints
+ (*Broker)(nil), // 22: apache.rocketmq.v2.Broker
+ (*MessageQueue)(nil), // 23: apache.rocketmq.v2.MessageQueue
+ (*Digest)(nil), // 24: apache.rocketmq.v2.Digest
+ (*SystemProperties)(nil), // 25:
apache.rocketmq.v2.SystemProperties
+ (*DeadLetterQueue)(nil), // 26: apache.rocketmq.v2.DeadLetterQueue
+ (*Message)(nil), // 27: apache.rocketmq.v2.Message
+ (*Assignment)(nil), // 28: apache.rocketmq.v2.Assignment
+ (*Status)(nil), // 29: apache.rocketmq.v2.Status
+ (*UA)(nil), // 30: apache.rocketmq.v2.UA
+ (*Settings)(nil), // 31: apache.rocketmq.v2.Settings
+ (*Publishing)(nil), // 32: apache.rocketmq.v2.Publishing
+ (*Subscription)(nil), // 33: apache.rocketmq.v2.Subscription
+ (*Metric)(nil), // 34: apache.rocketmq.v2.Metric
+ (*OffsetOption)(nil), // 35: apache.rocketmq.v2.OffsetOption
+ nil, // 36:
apache.rocketmq.v2.Message.UserPropertiesEntry
+ (*durationpb.Duration)(nil), // 37: google.protobuf.Duration
+ (*timestamppb.Timestamp)(nil), // 38: google.protobuf.Timestamp
}
var file_apache_rocketmq_v2_definition_proto_depIdxs = []int32{
3, // 0: apache.rocketmq.v2.FilterExpression.type:type_name ->
apache.rocketmq.v2.FilterType
- 15, // 1: apache.rocketmq.v2.RetryPolicy.exponential_backoff:type_name
-> apache.rocketmq.v2.ExponentialBackoff
- 16, // 2: apache.rocketmq.v2.RetryPolicy.customized_backoff:type_name
-> apache.rocketmq.v2.CustomizedBackoff
- 35, // 3: apache.rocketmq.v2.ExponentialBackoff.initial:type_name ->
google.protobuf.Duration
- 35, // 4: apache.rocketmq.v2.ExponentialBackoff.max:type_name ->
google.protobuf.Duration
- 35, // 5: apache.rocketmq.v2.CustomizedBackoff.next:type_name ->
google.protobuf.Duration
- 17, // 6: apache.rocketmq.v2.SubscriptionEntry.topic:type_name ->
apache.rocketmq.v2.Resource
- 13, // 7: apache.rocketmq.v2.SubscriptionEntry.expression:type_name ->
apache.rocketmq.v2.FilterExpression
+ 16, // 1: apache.rocketmq.v2.RetryPolicy.exponential_backoff:type_name
-> apache.rocketmq.v2.ExponentialBackoff
+ 17, // 2: apache.rocketmq.v2.RetryPolicy.customized_backoff:type_name
-> apache.rocketmq.v2.CustomizedBackoff
+ 37, // 3: apache.rocketmq.v2.ExponentialBackoff.initial:type_name ->
google.protobuf.Duration
+ 37, // 4: apache.rocketmq.v2.ExponentialBackoff.max:type_name ->
google.protobuf.Duration
+ 37, // 5: apache.rocketmq.v2.CustomizedBackoff.next:type_name ->
google.protobuf.Duration
+ 18, // 6: apache.rocketmq.v2.SubscriptionEntry.topic:type_name ->
apache.rocketmq.v2.Resource
+ 14, // 7: apache.rocketmq.v2.SubscriptionEntry.expression:type_name ->
apache.rocketmq.v2.FilterExpression
4, // 8: apache.rocketmq.v2.Endpoints.scheme:type_name ->
apache.rocketmq.v2.AddressScheme
- 19, // 9: apache.rocketmq.v2.Endpoints.addresses:type_name ->
apache.rocketmq.v2.Address
- 20, // 10: apache.rocketmq.v2.Broker.endpoints:type_name ->
apache.rocketmq.v2.Endpoints
- 17, // 11: apache.rocketmq.v2.MessageQueue.topic:type_name ->
apache.rocketmq.v2.Resource
+ 20, // 9: apache.rocketmq.v2.Endpoints.addresses:type_name ->
apache.rocketmq.v2.Address
+ 21, // 10: apache.rocketmq.v2.Broker.endpoints:type_name ->
apache.rocketmq.v2.Endpoints
+ 18, // 11: apache.rocketmq.v2.MessageQueue.topic:type_name ->
apache.rocketmq.v2.Resource
2, // 12: apache.rocketmq.v2.MessageQueue.permission:type_name ->
apache.rocketmq.v2.Permission
- 21, // 13: apache.rocketmq.v2.MessageQueue.broker:type_name ->
apache.rocketmq.v2.Broker
+ 22, // 13: apache.rocketmq.v2.MessageQueue.broker:type_name ->
apache.rocketmq.v2.Broker
5, // 14:
apache.rocketmq.v2.MessageQueue.accept_message_types:type_name ->
apache.rocketmq.v2.MessageType
6, // 15: apache.rocketmq.v2.Digest.type:type_name ->
apache.rocketmq.v2.DigestType
- 23, // 16: apache.rocketmq.v2.SystemProperties.body_digest:type_name ->
apache.rocketmq.v2.Digest
+ 24, // 16: apache.rocketmq.v2.SystemProperties.body_digest:type_name ->
apache.rocketmq.v2.Digest
8, // 17: apache.rocketmq.v2.SystemProperties.body_encoding:type_name
-> apache.rocketmq.v2.Encoding
5, // 18: apache.rocketmq.v2.SystemProperties.message_type:type_name
-> apache.rocketmq.v2.MessageType
- 36, // 19: apache.rocketmq.v2.SystemProperties.born_timestamp:type_name
-> google.protobuf.Timestamp
- 36, // 20:
apache.rocketmq.v2.SystemProperties.store_timestamp:type_name ->
google.protobuf.Timestamp
- 36, // 21:
apache.rocketmq.v2.SystemProperties.delivery_timestamp:type_name ->
google.protobuf.Timestamp
- 35, // 22:
apache.rocketmq.v2.SystemProperties.invisible_duration:type_name ->
google.protobuf.Duration
- 35, // 23:
apache.rocketmq.v2.SystemProperties.orphaned_transaction_recovery_duration:type_name
-> google.protobuf.Duration
- 25, // 24:
apache.rocketmq.v2.SystemProperties.dead_letter_queue:type_name ->
apache.rocketmq.v2.DeadLetterQueue
- 17, // 25: apache.rocketmq.v2.Message.topic:type_name ->
apache.rocketmq.v2.Resource
- 34, // 26: apache.rocketmq.v2.Message.user_properties:type_name ->
apache.rocketmq.v2.Message.UserPropertiesEntry
- 24, // 27: apache.rocketmq.v2.Message.system_properties:type_name ->
apache.rocketmq.v2.SystemProperties
- 22, // 28: apache.rocketmq.v2.Assignment.message_queue:type_name ->
apache.rocketmq.v2.MessageQueue
+ 38, // 19: apache.rocketmq.v2.SystemProperties.born_timestamp:type_name
-> google.protobuf.Timestamp
+ 38, // 20:
apache.rocketmq.v2.SystemProperties.store_timestamp:type_name ->
google.protobuf.Timestamp
+ 38, // 21:
apache.rocketmq.v2.SystemProperties.delivery_timestamp:type_name ->
google.protobuf.Timestamp
+ 37, // 22:
apache.rocketmq.v2.SystemProperties.invisible_duration:type_name ->
google.protobuf.Duration
+ 37, // 23:
apache.rocketmq.v2.SystemProperties.orphaned_transaction_recovery_duration:type_name
-> google.protobuf.Duration
+ 26, // 24:
apache.rocketmq.v2.SystemProperties.dead_letter_queue:type_name ->
apache.rocketmq.v2.DeadLetterQueue
+ 18, // 25: apache.rocketmq.v2.Message.topic:type_name ->
apache.rocketmq.v2.Resource
+ 36, // 26: apache.rocketmq.v2.Message.user_properties:type_name ->
apache.rocketmq.v2.Message.UserPropertiesEntry
+ 25, // 27: apache.rocketmq.v2.Message.system_properties:type_name ->
apache.rocketmq.v2.SystemProperties
+ 23, // 28: apache.rocketmq.v2.Assignment.message_queue:type_name ->
apache.rocketmq.v2.MessageQueue
9, // 29: apache.rocketmq.v2.Status.code:type_name ->
apache.rocketmq.v2.Code
10, // 30: apache.rocketmq.v2.UA.language:type_name ->
apache.rocketmq.v2.Language
7, // 31: apache.rocketmq.v2.Settings.client_type:type_name ->
apache.rocketmq.v2.ClientType
- 20, // 32: apache.rocketmq.v2.Settings.access_point:type_name ->
apache.rocketmq.v2.Endpoints
- 14, // 33: apache.rocketmq.v2.Settings.backoff_policy:type_name ->
apache.rocketmq.v2.RetryPolicy
- 35, // 34: apache.rocketmq.v2.Settings.request_timeout:type_name ->
google.protobuf.Duration
- 31, // 35: apache.rocketmq.v2.Settings.publishing:type_name ->
apache.rocketmq.v2.Publishing
- 32, // 36: apache.rocketmq.v2.Settings.subscription:type_name ->
apache.rocketmq.v2.Subscription
- 29, // 37: apache.rocketmq.v2.Settings.user_agent:type_name ->
apache.rocketmq.v2.UA
- 33, // 38: apache.rocketmq.v2.Settings.metric:type_name ->
apache.rocketmq.v2.Metric
- 17, // 39: apache.rocketmq.v2.Publishing.topics:type_name ->
apache.rocketmq.v2.Resource
- 17, // 40: apache.rocketmq.v2.Subscription.group:type_name ->
apache.rocketmq.v2.Resource
- 18, // 41: apache.rocketmq.v2.Subscription.subscriptions:type_name ->
apache.rocketmq.v2.SubscriptionEntry
- 35, // 42:
apache.rocketmq.v2.Subscription.long_polling_timeout:type_name ->
google.protobuf.Duration
- 20, // 43: apache.rocketmq.v2.Metric.endpoints:type_name ->
apache.rocketmq.v2.Endpoints
- 44, // [44:44] is the sub-list for method output_type
- 44, // [44:44] is the sub-list for method input_type
- 44, // [44:44] is the sub-list for extension type_name
- 44, // [44:44] is the sub-list for extension extendee
- 0, // [0:44] is the sub-list for field type_name
+ 21, // 32: apache.rocketmq.v2.Settings.access_point:type_name ->
apache.rocketmq.v2.Endpoints
+ 15, // 33: apache.rocketmq.v2.Settings.backoff_policy:type_name ->
apache.rocketmq.v2.RetryPolicy
+ 37, // 34: apache.rocketmq.v2.Settings.request_timeout:type_name ->
google.protobuf.Duration
+ 32, // 35: apache.rocketmq.v2.Settings.publishing:type_name ->
apache.rocketmq.v2.Publishing
+ 33, // 36: apache.rocketmq.v2.Settings.subscription:type_name ->
apache.rocketmq.v2.Subscription
+ 30, // 37: apache.rocketmq.v2.Settings.user_agent:type_name ->
apache.rocketmq.v2.UA
+ 34, // 38: apache.rocketmq.v2.Settings.metric:type_name ->
apache.rocketmq.v2.Metric
+ 18, // 39: apache.rocketmq.v2.Publishing.topics:type_name ->
apache.rocketmq.v2.Resource
+ 18, // 40: apache.rocketmq.v2.Subscription.group:type_name ->
apache.rocketmq.v2.Resource
+ 19, // 41: apache.rocketmq.v2.Subscription.subscriptions:type_name ->
apache.rocketmq.v2.SubscriptionEntry
+ 37, // 42:
apache.rocketmq.v2.Subscription.long_polling_timeout:type_name ->
google.protobuf.Duration
+ 21, // 43: apache.rocketmq.v2.Metric.endpoints:type_name ->
apache.rocketmq.v2.Endpoints
+ 13, // 44: apache.rocketmq.v2.OffsetOption.policy:type_name ->
apache.rocketmq.v2.OffsetOption.Policy
+ 45, // [45:45] is the sub-list for method output_type
+ 45, // [45:45] is the sub-list for method input_type
+ 45, // [45:45] is the sub-list for extension type_name
+ 45, // [45:45] is the sub-list for extension extendee
+ 0, // [0:45] is the sub-list for field type_name
}
func init() { file_apache_rocketmq_v2_definition_proto_init() }
@@ -2973,13 +3161,19 @@ func file_apache_rocketmq_v2_definition_proto_init() {
}
file_apache_rocketmq_v2_definition_proto_msgTypes[19].OneofWrappers =
[]any{}
file_apache_rocketmq_v2_definition_proto_msgTypes[20].OneofWrappers =
[]any{}
+ file_apache_rocketmq_v2_definition_proto_msgTypes[21].OneofWrappers =
[]any{
+ (*OffsetOption_Policy_)(nil),
+ (*OffsetOption_Offset)(nil),
+ (*OffsetOption_TailN)(nil),
+ (*OffsetOption_Timestamp)(nil),
+ }
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor:
unsafe.Slice(unsafe.StringData(file_apache_rocketmq_v2_definition_proto_rawDesc),
len(file_apache_rocketmq_v2_definition_proto_rawDesc)),
- NumEnums: 13,
- NumMessages: 22,
+ NumEnums: 14,
+ NumMessages: 23,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/golang/protocol/v2/service.pb.go b/golang/protocol/v2/service.pb.go
index d2895eb5..cdf05778 100644
--- a/golang/protocol/v2/service.pb.go
+++ b/golang/protocol/v2/service.pb.go
@@ -15,20 +15,21 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.36.8
-// protoc v5.29.3
+// protoc-gen-go v1.36.11
+// protoc v7.34.0
// source: apache/rocketmq/v2/service.proto
package v2
import (
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
durationpb "google.golang.org/protobuf/types/known/durationpb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
- reflect "reflect"
- sync "sync"
- unsafe "unsafe"
)
const (
@@ -1872,7 +1873,10 @@ type ChangeInvisibleDurationRequest struct {
// New invisible duration
InvisibleDuration *durationpb.Duration
`protobuf:"bytes,4,opt,name=invisible_duration,json=invisibleDuration,proto3"
json:"invisible_duration,omitempty"`
// For message tracing
- MessageId string
`protobuf:"bytes,5,opt,name=message_id,json=messageId,proto3"
json:"message_id,omitempty"`
+ MessageId string
`protobuf:"bytes,5,opt,name=message_id,json=messageId,proto3"
json:"message_id,omitempty"`
+ LiteTopic *string
`protobuf:"bytes,6,opt,name=lite_topic,json=liteTopic,proto3,oneof"
json:"lite_topic,omitempty"`
+ // If true, server will not increment the retry times for this message
+ Suspend *bool `protobuf:"varint,7,opt,name=suspend,proto3,oneof"
json:"suspend,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1942,6 +1946,20 @@ func (x *ChangeInvisibleDurationRequest) GetMessageId()
string {
return ""
}
+func (x *ChangeInvisibleDurationRequest) GetLiteTopic() string {
+ if x != nil && x.LiteTopic != nil {
+ return *x.LiteTopic
+ }
+ return ""
+}
+
+func (x *ChangeInvisibleDurationRequest) GetSuspend() bool {
+ if x != nil && x.Suspend != nil {
+ return *x.Suspend
+ }
+ return false
+}
+
type ChangeInvisibleDurationResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Status *Status
`protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
@@ -2610,8 +2628,9 @@ type SyncLiteSubscriptionRequest struct {
// consumer group
Group *Resource `protobuf:"bytes,3,opt,name=group,proto3"
json:"group,omitempty"`
// lite subscription set of lite topics
- LiteTopicSet []string
`protobuf:"bytes,4,rep,name=lite_topic_set,json=liteTopicSet,proto3"
json:"lite_topic_set,omitempty"`
- Version *int64
`protobuf:"varint,5,opt,name=version,proto3,oneof" json:"version,omitempty"`
+ LiteTopicSet []string
`protobuf:"bytes,4,rep,name=lite_topic_set,json=liteTopicSet,proto3"
json:"lite_topic_set,omitempty"`
+ Version *int64
`protobuf:"varint,5,opt,name=version,proto3,oneof" json:"version,omitempty"`
+ OffsetOption *OffsetOption
`protobuf:"bytes,6,opt,name=offset_option,json=offsetOption,proto3,oneof"
json:"offset_option,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -2681,6 +2700,13 @@ func (x *SyncLiteSubscriptionRequest) GetVersion() int64
{
return 0
}
+func (x *SyncLiteSubscriptionRequest) GetOffsetOption() *OffsetOption {
+ if x != nil {
+ return x.OffsetOption
+ }
+ return nil
+}
+
type SyncLiteSubscriptionResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Status *Status
`protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
@@ -2861,14 +2887,20 @@ const file_apache_rocketmq_v2_service_proto_rawDesc =
"" +
"\x05group\x18\x01
\x01(\v2\x1c.apache.rocketmq.v2.ResourceH\x00R\x05group\x88\x01\x01B\b\n" +
"\x06_group\"U\n" +
"\x1fNotifyClientTerminationResponse\x122\n" +
- "\x06status\x18\x01
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\"\x98\x02\n" +
+ "\x06status\x18\x01
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\"\xf6\x02\n" +
"\x1eChangeInvisibleDurationRequest\x122\n" +
"\x05group\x18\x01
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05group\x122\n" +
"\x05topic\x18\x02
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05topic\x12%\n" +
"\x0ereceipt_handle\x18\x03 \x01(\tR\rreceiptHandle\x12H\n" +
"\x12invisible_duration\x18\x04
\x01(\v2\x19.google.protobuf.DurationR\x11invisibleDuration\x12\x1d\n" +
"\n" +
- "message_id\x18\x05 \x01(\tR\tmessageId\"|\n" +
+ "message_id\x18\x05 \x01(\tR\tmessageId\x12\"\n" +
+ "\n" +
+ "lite_topic\x18\x06 \x01(\tH\x00R\tliteTopic\x88\x01\x01\x12\x1d\n" +
+ "\asuspend\x18\a \x01(\bH\x01R\asuspend\x88\x01\x01B\r\n" +
+ "\v_lite_topicB\n" +
+ "\n" +
+ "\b_suspend\"|\n" +
"\x1fChangeInvisibleDurationResponse\x122\n" +
"\x06status\x18\x01
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\x12%\n" +
"\x0ereceipt_handle\x18\x02 \x01(\tR\rreceiptHandle\"\xe6\x02\n" +
@@ -2913,15 +2945,17 @@ const file_apache_rocketmq_v2_service_proto_rawDesc =
"" +
"\x15RecallMessageResponse\x122\n" +
"\x06status\x18\x01
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status\x12\x1d\n" +
"\n" +
- "message_id\x18\x02 \x01(\tR\tmessageId\"\x9a\x02\n" +
+ "message_id\x18\x02 \x01(\tR\tmessageId\"\xf8\x02\n" +
"\x1bSyncLiteSubscriptionRequest\x12B\n" +
"\x06action\x18\x01
\x01(\x0e2*.apache.rocketmq.v2.LiteSubscriptionActionR\x06action\x122\n" +
"\x05topic\x18\x02
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05topic\x122\n" +
"\x05group\x18\x03
\x01(\v2\x1c.apache.rocketmq.v2.ResourceR\x05group\x12$\n" +
"\x0elite_topic_set\x18\x04 \x03(\tR\fliteTopicSet\x12\x1d\n" +
- "\aversion\x18\x05 \x01(\x03H\x00R\aversion\x88\x01\x01B\n" +
+ "\aversion\x18\x05 \x01(\x03H\x00R\aversion\x88\x01\x01\x12J\n" +
+ "\roffset_option\x18\x06 \x01(\v2
.apache.rocketmq.v2.OffsetOptionH\x01R\foffsetOption\x88\x01\x01B\n" +
"\n" +
- "\b_version\"R\n" +
+ "\b_versionB\x10\n" +
+ "\x0e_offset_option\"R\n" +
"\x1cSyncLiteSubscriptionResponse\x122\n" +
"\x06status\x18\x01
\x01(\v2\x1a.apache.rocketmq.v2.StatusR\x06status2\xcc\x0e\n" +
"\x10MessagingService\x12]\n" +
@@ -3018,6 +3052,7 @@ var file_apache_rocketmq_v2_service_proto_goTypes = []any{
(*Settings)(nil), // 55:
apache.rocketmq.v2.Settings
(QueryOffsetPolicy)(0), // 56:
apache.rocketmq.v2.QueryOffsetPolicy
(LiteSubscriptionAction)(0), // 57:
apache.rocketmq.v2.LiteSubscriptionAction
+ (*OffsetOption)(nil), // 58:
apache.rocketmq.v2.OffsetOption
}
var file_apache_rocketmq_v2_service_proto_depIdxs = []int32{
43, // 0: apache.rocketmq.v2.QueryRouteRequest.topic:type_name ->
apache.rocketmq.v2.Resource
@@ -3095,46 +3130,47 @@ var file_apache_rocketmq_v2_service_proto_depIdxs =
[]int32{
57, // 72:
apache.rocketmq.v2.SyncLiteSubscriptionRequest.action:type_name ->
apache.rocketmq.v2.LiteSubscriptionAction
43, // 73:
apache.rocketmq.v2.SyncLiteSubscriptionRequest.topic:type_name ->
apache.rocketmq.v2.Resource
43, // 74:
apache.rocketmq.v2.SyncLiteSubscriptionRequest.group:type_name ->
apache.rocketmq.v2.Resource
- 45, // 75:
apache.rocketmq.v2.SyncLiteSubscriptionResponse.status:type_name ->
apache.rocketmq.v2.Status
- 0, // 76: apache.rocketmq.v2.MessagingService.QueryRoute:input_type ->
apache.rocketmq.v2.QueryRouteRequest
- 15, // 77: apache.rocketmq.v2.MessagingService.Heartbeat:input_type ->
apache.rocketmq.v2.HeartbeatRequest
- 2, // 78: apache.rocketmq.v2.MessagingService.SendMessage:input_type
-> apache.rocketmq.v2.SendMessageRequest
- 5, // 79:
apache.rocketmq.v2.MessagingService.QueryAssignment:input_type ->
apache.rocketmq.v2.QueryAssignmentRequest
- 7, // 80:
apache.rocketmq.v2.MessagingService.ReceiveMessage:input_type ->
apache.rocketmq.v2.ReceiveMessageRequest
- 10, // 81: apache.rocketmq.v2.MessagingService.AckMessage:input_type ->
apache.rocketmq.v2.AckMessageRequest
- 13, // 82:
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:input_type
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest
- 31, // 83: apache.rocketmq.v2.MessagingService.PullMessage:input_type
-> apache.rocketmq.v2.PullMessageRequest
- 33, // 84: apache.rocketmq.v2.MessagingService.UpdateOffset:input_type
-> apache.rocketmq.v2.UpdateOffsetRequest
- 35, // 85: apache.rocketmq.v2.MessagingService.GetOffset:input_type ->
apache.rocketmq.v2.GetOffsetRequest
- 37, // 86: apache.rocketmq.v2.MessagingService.QueryOffset:input_type
-> apache.rocketmq.v2.QueryOffsetRequest
- 17, // 87:
apache.rocketmq.v2.MessagingService.EndTransaction:input_type ->
apache.rocketmq.v2.EndTransactionRequest
- 26, // 88: apache.rocketmq.v2.MessagingService.Telemetry:input_type ->
apache.rocketmq.v2.TelemetryCommand
- 27, // 89:
apache.rocketmq.v2.MessagingService.NotifyClientTermination:input_type ->
apache.rocketmq.v2.NotifyClientTerminationRequest
- 29, // 90:
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:input_type ->
apache.rocketmq.v2.ChangeInvisibleDurationRequest
- 39, // 91: apache.rocketmq.v2.MessagingService.RecallMessage:input_type
-> apache.rocketmq.v2.RecallMessageRequest
- 41, // 92:
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:input_type ->
apache.rocketmq.v2.SyncLiteSubscriptionRequest
- 1, // 93: apache.rocketmq.v2.MessagingService.QueryRoute:output_type
-> apache.rocketmq.v2.QueryRouteResponse
- 16, // 94: apache.rocketmq.v2.MessagingService.Heartbeat:output_type ->
apache.rocketmq.v2.HeartbeatResponse
- 4, // 95: apache.rocketmq.v2.MessagingService.SendMessage:output_type
-> apache.rocketmq.v2.SendMessageResponse
- 6, // 96:
apache.rocketmq.v2.MessagingService.QueryAssignment:output_type ->
apache.rocketmq.v2.QueryAssignmentResponse
- 8, // 97:
apache.rocketmq.v2.MessagingService.ReceiveMessage:output_type ->
apache.rocketmq.v2.ReceiveMessageResponse
- 12, // 98: apache.rocketmq.v2.MessagingService.AckMessage:output_type
-> apache.rocketmq.v2.AckMessageResponse
- 14, // 99:
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:output_type
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse
- 32, // 100: apache.rocketmq.v2.MessagingService.PullMessage:output_type
-> apache.rocketmq.v2.PullMessageResponse
- 34, // 101:
apache.rocketmq.v2.MessagingService.UpdateOffset:output_type ->
apache.rocketmq.v2.UpdateOffsetResponse
- 36, // 102: apache.rocketmq.v2.MessagingService.GetOffset:output_type
-> apache.rocketmq.v2.GetOffsetResponse
- 38, // 103: apache.rocketmq.v2.MessagingService.QueryOffset:output_type
-> apache.rocketmq.v2.QueryOffsetResponse
- 18, // 104:
apache.rocketmq.v2.MessagingService.EndTransaction:output_type ->
apache.rocketmq.v2.EndTransactionResponse
- 26, // 105: apache.rocketmq.v2.MessagingService.Telemetry:output_type
-> apache.rocketmq.v2.TelemetryCommand
- 28, // 106:
apache.rocketmq.v2.MessagingService.NotifyClientTermination:output_type ->
apache.rocketmq.v2.NotifyClientTerminationResponse
- 30, // 107:
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:output_type ->
apache.rocketmq.v2.ChangeInvisibleDurationResponse
- 40, // 108:
apache.rocketmq.v2.MessagingService.RecallMessage:output_type ->
apache.rocketmq.v2.RecallMessageResponse
- 42, // 109:
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:output_type ->
apache.rocketmq.v2.SyncLiteSubscriptionResponse
- 93, // [93:110] is the sub-list for method output_type
- 76, // [76:93] is the sub-list for method input_type
- 76, // [76:76] is the sub-list for extension type_name
- 76, // [76:76] is the sub-list for extension extendee
- 0, // [0:76] is the sub-list for field type_name
+ 58, // 75:
apache.rocketmq.v2.SyncLiteSubscriptionRequest.offset_option:type_name ->
apache.rocketmq.v2.OffsetOption
+ 45, // 76:
apache.rocketmq.v2.SyncLiteSubscriptionResponse.status:type_name ->
apache.rocketmq.v2.Status
+ 0, // 77: apache.rocketmq.v2.MessagingService.QueryRoute:input_type ->
apache.rocketmq.v2.QueryRouteRequest
+ 15, // 78: apache.rocketmq.v2.MessagingService.Heartbeat:input_type ->
apache.rocketmq.v2.HeartbeatRequest
+ 2, // 79: apache.rocketmq.v2.MessagingService.SendMessage:input_type
-> apache.rocketmq.v2.SendMessageRequest
+ 5, // 80:
apache.rocketmq.v2.MessagingService.QueryAssignment:input_type ->
apache.rocketmq.v2.QueryAssignmentRequest
+ 7, // 81:
apache.rocketmq.v2.MessagingService.ReceiveMessage:input_type ->
apache.rocketmq.v2.ReceiveMessageRequest
+ 10, // 82: apache.rocketmq.v2.MessagingService.AckMessage:input_type ->
apache.rocketmq.v2.AckMessageRequest
+ 13, // 83:
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:input_type
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest
+ 31, // 84: apache.rocketmq.v2.MessagingService.PullMessage:input_type
-> apache.rocketmq.v2.PullMessageRequest
+ 33, // 85: apache.rocketmq.v2.MessagingService.UpdateOffset:input_type
-> apache.rocketmq.v2.UpdateOffsetRequest
+ 35, // 86: apache.rocketmq.v2.MessagingService.GetOffset:input_type ->
apache.rocketmq.v2.GetOffsetRequest
+ 37, // 87: apache.rocketmq.v2.MessagingService.QueryOffset:input_type
-> apache.rocketmq.v2.QueryOffsetRequest
+ 17, // 88:
apache.rocketmq.v2.MessagingService.EndTransaction:input_type ->
apache.rocketmq.v2.EndTransactionRequest
+ 26, // 89: apache.rocketmq.v2.MessagingService.Telemetry:input_type ->
apache.rocketmq.v2.TelemetryCommand
+ 27, // 90:
apache.rocketmq.v2.MessagingService.NotifyClientTermination:input_type ->
apache.rocketmq.v2.NotifyClientTerminationRequest
+ 29, // 91:
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:input_type ->
apache.rocketmq.v2.ChangeInvisibleDurationRequest
+ 39, // 92: apache.rocketmq.v2.MessagingService.RecallMessage:input_type
-> apache.rocketmq.v2.RecallMessageRequest
+ 41, // 93:
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:input_type ->
apache.rocketmq.v2.SyncLiteSubscriptionRequest
+ 1, // 94: apache.rocketmq.v2.MessagingService.QueryRoute:output_type
-> apache.rocketmq.v2.QueryRouteResponse
+ 16, // 95: apache.rocketmq.v2.MessagingService.Heartbeat:output_type ->
apache.rocketmq.v2.HeartbeatResponse
+ 4, // 96: apache.rocketmq.v2.MessagingService.SendMessage:output_type
-> apache.rocketmq.v2.SendMessageResponse
+ 6, // 97:
apache.rocketmq.v2.MessagingService.QueryAssignment:output_type ->
apache.rocketmq.v2.QueryAssignmentResponse
+ 8, // 98:
apache.rocketmq.v2.MessagingService.ReceiveMessage:output_type ->
apache.rocketmq.v2.ReceiveMessageResponse
+ 12, // 99: apache.rocketmq.v2.MessagingService.AckMessage:output_type
-> apache.rocketmq.v2.AckMessageResponse
+ 14, // 100:
apache.rocketmq.v2.MessagingService.ForwardMessageToDeadLetterQueue:output_type
-> apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse
+ 32, // 101: apache.rocketmq.v2.MessagingService.PullMessage:output_type
-> apache.rocketmq.v2.PullMessageResponse
+ 34, // 102:
apache.rocketmq.v2.MessagingService.UpdateOffset:output_type ->
apache.rocketmq.v2.UpdateOffsetResponse
+ 36, // 103: apache.rocketmq.v2.MessagingService.GetOffset:output_type
-> apache.rocketmq.v2.GetOffsetResponse
+ 38, // 104: apache.rocketmq.v2.MessagingService.QueryOffset:output_type
-> apache.rocketmq.v2.QueryOffsetResponse
+ 18, // 105:
apache.rocketmq.v2.MessagingService.EndTransaction:output_type ->
apache.rocketmq.v2.EndTransactionResponse
+ 26, // 106: apache.rocketmq.v2.MessagingService.Telemetry:output_type
-> apache.rocketmq.v2.TelemetryCommand
+ 28, // 107:
apache.rocketmq.v2.MessagingService.NotifyClientTermination:output_type ->
apache.rocketmq.v2.NotifyClientTerminationResponse
+ 30, // 108:
apache.rocketmq.v2.MessagingService.ChangeInvisibleDuration:output_type ->
apache.rocketmq.v2.ChangeInvisibleDurationResponse
+ 40, // 109:
apache.rocketmq.v2.MessagingService.RecallMessage:output_type ->
apache.rocketmq.v2.RecallMessageResponse
+ 42, // 110:
apache.rocketmq.v2.MessagingService.SyncLiteSubscription:output_type ->
apache.rocketmq.v2.SyncLiteSubscriptionResponse
+ 94, // [94:111] is the sub-list for method output_type
+ 77, // [77:94] is the sub-list for method input_type
+ 77, // [77:77] is the sub-list for extension type_name
+ 77, // [77:77] is the sub-list for extension extendee
+ 0, // [0:77] is the sub-list for field type_name
}
func init() { file_apache_rocketmq_v2_service_proto_init() }
@@ -3164,6 +3200,7 @@ func file_apache_rocketmq_v2_service_proto_init() {
(*TelemetryCommand_NotifyUnsubscribeLiteCommand)(nil),
}
file_apache_rocketmq_v2_service_proto_msgTypes[27].OneofWrappers =
[]any{}
+ file_apache_rocketmq_v2_service_proto_msgTypes[29].OneofWrappers =
[]any{}
file_apache_rocketmq_v2_service_proto_msgTypes[32].OneofWrappers =
[]any{
(*PullMessageResponse_Status)(nil),
(*PullMessageResponse_Message)(nil),
diff --git a/golang/protocol/v2/service_grpc.pb.go
b/golang/protocol/v2/service_grpc.pb.go
index b6ecd418..61073218 100644
--- a/golang/protocol/v2/service_grpc.pb.go
+++ b/golang/protocol/v2/service_grpc.pb.go
@@ -15,14 +15,15 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
-// - protoc-gen-go-grpc v1.5.1
-// - protoc v5.29.3
+// - protoc-gen-go-grpc v1.6.1
+// - protoc v7.34.0
// source: apache/rocketmq/v2/service.proto
package v2
import (
context "context"
+
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@@ -491,55 +492,55 @@ type MessagingServiceServer interface {
type UnimplementedMessagingServiceServer struct{}
func (UnimplementedMessagingServiceServer) QueryRoute(context.Context,
*QueryRouteRequest) (*QueryRouteResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method QueryRoute not
implemented")
+ return nil, status.Error(codes.Unimplemented, "method QueryRoute not
implemented")
}
func (UnimplementedMessagingServiceServer) Heartbeat(context.Context,
*HeartbeatRequest) (*HeartbeatResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method Heartbeat not
implemented")
+ return nil, status.Error(codes.Unimplemented, "method Heartbeat not
implemented")
}
func (UnimplementedMessagingServiceServer) SendMessage(context.Context,
*SendMessageRequest) (*SendMessageResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method SendMessage not
implemented")
+ return nil, status.Error(codes.Unimplemented, "method SendMessage not
implemented")
}
func (UnimplementedMessagingServiceServer) QueryAssignment(context.Context,
*QueryAssignmentRequest) (*QueryAssignmentResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method QueryAssignment
not implemented")
+ return nil, status.Error(codes.Unimplemented, "method QueryAssignment
not implemented")
}
func (UnimplementedMessagingServiceServer)
ReceiveMessage(*ReceiveMessageRequest,
grpc.ServerStreamingServer[ReceiveMessageResponse]) error {
- return status.Errorf(codes.Unimplemented, "method ReceiveMessage not
implemented")
+ return status.Error(codes.Unimplemented, "method ReceiveMessage not
implemented")
}
func (UnimplementedMessagingServiceServer) AckMessage(context.Context,
*AckMessageRequest) (*AckMessageResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method AckMessage not
implemented")
+ return nil, status.Error(codes.Unimplemented, "method AckMessage not
implemented")
}
func (UnimplementedMessagingServiceServer)
ForwardMessageToDeadLetterQueue(context.Context,
*ForwardMessageToDeadLetterQueueRequest)
(*ForwardMessageToDeadLetterQueueResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method
ForwardMessageToDeadLetterQueue not implemented")
+ return nil, status.Error(codes.Unimplemented, "method
ForwardMessageToDeadLetterQueue not implemented")
}
func (UnimplementedMessagingServiceServer) PullMessage(*PullMessageRequest,
grpc.ServerStreamingServer[PullMessageResponse]) error {
- return status.Errorf(codes.Unimplemented, "method PullMessage not
implemented")
+ return status.Error(codes.Unimplemented, "method PullMessage not
implemented")
}
func (UnimplementedMessagingServiceServer) UpdateOffset(context.Context,
*UpdateOffsetRequest) (*UpdateOffsetResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method UpdateOffset not
implemented")
+ return nil, status.Error(codes.Unimplemented, "method UpdateOffset not
implemented")
}
func (UnimplementedMessagingServiceServer) GetOffset(context.Context,
*GetOffsetRequest) (*GetOffsetResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method GetOffset not
implemented")
+ return nil, status.Error(codes.Unimplemented, "method GetOffset not
implemented")
}
func (UnimplementedMessagingServiceServer) QueryOffset(context.Context,
*QueryOffsetRequest) (*QueryOffsetResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method QueryOffset not
implemented")
+ return nil, status.Error(codes.Unimplemented, "method QueryOffset not
implemented")
}
func (UnimplementedMessagingServiceServer) EndTransaction(context.Context,
*EndTransactionRequest) (*EndTransactionResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method EndTransaction
not implemented")
+ return nil, status.Error(codes.Unimplemented, "method EndTransaction
not implemented")
}
func (UnimplementedMessagingServiceServer)
Telemetry(grpc.BidiStreamingServer[TelemetryCommand, TelemetryCommand]) error {
- return status.Errorf(codes.Unimplemented, "method Telemetry not
implemented")
+ return status.Error(codes.Unimplemented, "method Telemetry not
implemented")
}
func (UnimplementedMessagingServiceServer)
NotifyClientTermination(context.Context, *NotifyClientTerminationRequest)
(*NotifyClientTerminationResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method
NotifyClientTermination not implemented")
+ return nil, status.Error(codes.Unimplemented, "method
NotifyClientTermination not implemented")
}
func (UnimplementedMessagingServiceServer)
ChangeInvisibleDuration(context.Context, *ChangeInvisibleDurationRequest)
(*ChangeInvisibleDurationResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method
ChangeInvisibleDuration not implemented")
+ return nil, status.Error(codes.Unimplemented, "method
ChangeInvisibleDuration not implemented")
}
func (UnimplementedMessagingServiceServer) RecallMessage(context.Context,
*RecallMessageRequest) (*RecallMessageResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method RecallMessage
not implemented")
+ return nil, status.Error(codes.Unimplemented, "method RecallMessage not
implemented")
}
func (UnimplementedMessagingServiceServer)
SyncLiteSubscription(context.Context, *SyncLiteSubscriptionRequest)
(*SyncLiteSubscriptionResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method
SyncLiteSubscription not implemented")
+ return nil, status.Error(codes.Unimplemented, "method
SyncLiteSubscription not implemented")
}
func (UnimplementedMessagingServiceServer)
mustEmbedUnimplementedMessagingServiceServer() {}
func (UnimplementedMessagingServiceServer) testEmbeddedByValue()
{}
@@ -552,7 +553,7 @@ type UnsafeMessagingServiceServer interface {
}
func RegisterMessagingServiceServer(s grpc.ServiceRegistrar, srv
MessagingServiceServer) {
- // If the following call pancis, it indicates
UnimplementedMessagingServiceServer was
+ // If the following call panics, it indicates
UnimplementedMessagingServiceServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at
initialization
// time to prevent it from happening at runtime later due to I/O.
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 21d63074..237ef4b5 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -105,6 +105,14 @@ func (pc *defaultPushConsumer)
changeInvisibleDuration0(context context.Context,
InvisibleDuration: durationpb.New(invisibleDuration),
MessageId: messageView.GetMessageId(),
}
+
+ // Set LiteTopic only for lite consumer
+ if messageView.GetLiteTopic() != "" && pc.pcSettings.GetClientType() ==
v2.ClientType_LITE_PUSH_CONSUMER {
+ request.LiteTopic = &messageView.liteTopic
+ suspend := true
+ request.Suspend = &suspend
+ }
+
watchTime := time.Now()
resp, err := pc.cli.clientManager.ChangeInvisibleDuration(ctx,
endpoints, request, pc.pcSettings.requestTimeout)
duration := time.Since(watchTime)
@@ -173,35 +181,23 @@ func (pc *defaultPushConsumer)
wrapReceiveMessageRequest(batchSize int, messageQ
}
func (pc *defaultPushConsumer) wrapAckMessageRequest(messageView *MessageView)
*v2.AckMessageRequest {
+ entry := &v2.AckMessageEntry{
+ MessageId: messageView.GetMessageId(),
+ ReceiptHandle: messageView.GetReceiptHandle(),
+ }
+
+ // Set LiteTopic only for lite consumer
if messageView.GetLiteTopic() != "" && pc.pcSettings.GetClientType() ==
v2.ClientType_LITE_PUSH_CONSUMER {
- return &v2.AckMessageRequest{
- Group: pc.pcSettings.groupName,
- Topic: &v2.Resource{
- Name: messageView.GetTopic(),
- ResourceNamespace: pc.cli.config.NameSpace,
- },
- Entries: []*v2.AckMessageEntry{
- {
- MessageId:
messageView.GetMessageId(),
- ReceiptHandle:
messageView.GetReceiptHandle(),
- LiteTopic: &messageView.liteTopic,
- },
- },
- }
- } else {
- return &v2.AckMessageRequest{
- Group: pc.pcSettings.groupName,
- Topic: &v2.Resource{
- Name: messageView.GetTopic(),
- ResourceNamespace: pc.cli.config.NameSpace,
- },
- Entries: []*v2.AckMessageEntry{
- {
- MessageId:
messageView.GetMessageId(),
- ReceiptHandle:
messageView.GetReceiptHandle(),
- },
- },
- }
+ entry.LiteTopic = &messageView.liteTopic
+ }
+
+ return &v2.AckMessageRequest{
+ Group: pc.pcSettings.groupName,
+ Topic: &v2.Resource{
+ Name: messageView.GetTopic(),
+ ResourceNamespace: pc.cli.config.NameSpace,
+ },
+ Entries: []*v2.AckMessageEntry{entry},
}
}
@@ -378,8 +374,13 @@ func (pc *defaultPushConsumer) Start() error {
threadPool := NewSimpleThreadPool("MessageConsumption",
int(pc.pcOpts.maxCacheMessageCount), int(pc.pcOpts.consumptionThreadCount))
if pc.pcSettings.isFifo {
- pc.consumerService = NewFiFoConsumeService(pc.cli.clientID,
pc.pcOpts.messageListener, threadPool, pc.cli,
pc.pcOpts.enableFifoConsumeAccelerator)
- pc.cli.log.Infof("Create FIFO consume service,
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t",
pc.cli.config.ConsumerGroup, pc.cli.clientID,
pc.pcOpts.enableFifoConsumeAccelerator)
+ if pc.pcSettings.GetClientType() ==
v2.ClientType_LITE_PUSH_CONSUMER {
+ pc.consumerService =
NewLiteFifoConsumeService(pc.cli.clientID, pc.pcOpts.messageListener,
threadPool, pc.cli, pc.pcOpts.enableFifoConsumeAccelerator)
+ pc.cli.log.Infof("Create Lite FIFO consume service,
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t",
pc.cli.config.ConsumerGroup, pc.cli.clientID,
pc.pcOpts.enableFifoConsumeAccelerator)
+ } else {
+ pc.consumerService =
NewFiFoConsumeService(pc.cli.clientID, pc.pcOpts.messageListener, threadPool,
pc.cli, pc.pcOpts.enableFifoConsumeAccelerator)
+ pc.cli.log.Infof("Create FIFO consume service,
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t",
pc.cli.config.ConsumerGroup, pc.cli.clientID,
pc.pcOpts.enableFifoConsumeAccelerator)
+ }
} else {
pc.consumerService = NewStandardConsumeService(pc.cli.clientID,
pc.pcOpts.messageListener, threadPool, pc.cli)
pc.cli.log.Infof("Create standard consume service,
consumerGroup=%s, clientId=%s", pc.cli.config.ConsumerGroup, pc.cli.clientID)
@@ -601,7 +602,7 @@ func (pc *defaultPushConsumer) ack0(ctx context.Context,
messageView *MessageVie
}
func (pc *defaultPushConsumer)
wrapForwardMessageToDeadLetterQueueRequest(messageView *MessageView)
*v2.ForwardMessageToDeadLetterQueueRequest {
- return &v2.ForwardMessageToDeadLetterQueueRequest{
+ request := &v2.ForwardMessageToDeadLetterQueueRequest{
Group: pc.pcSettings.groupName,
Topic: &v2.Resource{
Name: messageView.GetTopic(),
@@ -612,6 +613,13 @@ func (pc *defaultPushConsumer)
wrapForwardMessageToDeadLetterQueueRequest(messag
DeliveryAttempt: messageView.GetDeliveryAttempt(),
MaxDeliveryAttempts: pc.pcSettings.GetRetryPolicy().MaxAttempts,
}
+
+ // Set LiteTopic only for lite consumer
+ if messageView.GetLiteTopic() != "" && pc.pcSettings.GetClientType() ==
v2.ClientType_LITE_PUSH_CONSUMER {
+ request.LiteTopic = &messageView.liteTopic
+ }
+
+ return request
}
func (pc *defaultPushConsumer) ForwardMessageToDeadLetterQueue(ctx
context.Context, messageView *MessageView) error {
diff --git a/golang/push_consumer_options.go b/golang/push_consumer_options.go
index 411fca19..b62572e5 100644
--- a/golang/push_consumer_options.go
+++ b/golang/push_consumer_options.go
@@ -26,19 +26,44 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)
-type ConsumerResult int8
+// ConsumerResultType represents the type of consumer result
+type ConsumerResultType int8
const (
- /**
- * Consume message successfully.
- */
- SUCCESS ConsumerResult = 0
- /**
- * Failed to consume message.
- */
- FAILURE ConsumerResult = 1
+ // ConsumerResultTypeSuccess represents successful consumption
+ ConsumerResultTypeSuccess ConsumerResultType = 0
+ // ConsumerResultTypeFailure represents failed consumption
+ ConsumerResultTypeFailure ConsumerResultType = 1
+ // ConsumerResultTypeSuspend represents suspended consumption
+ ConsumerResultTypeSuspend ConsumerResultType = 2
)
+// Predefined consumer results for backward compatibility
+var (
+ SUCCESS = ConsumerResult{Type: ConsumerResultTypeSuccess}
+ FAILURE = ConsumerResult{Type: ConsumerResultTypeFailure}
+)
+
+// Minimum suspend time allowed
+const minSuspendTime = 50 * time.Millisecond
+
+// NewConsumerResultSuspend creates a new ConsumerResult with the specified
suspend time
+func NewConsumerResultSuspend(suspendTime time.Duration) ConsumerResult {
+ if suspendTime < minSuspendTime {
+ panic(fmt.Sprintf("suspend time cannot be less than %v, got
%v", minSuspendTime, suspendTime))
+ }
+ return ConsumerResult{
+ Type: ConsumerResultTypeSuspend,
+ suspendTime: suspendTime,
+ }
+}
+
+// ConsumerResult represents the result of message consumption
+type ConsumerResult struct {
+ Type ConsumerResultType
+ suspendTime time.Duration
+}
+
type MessageListener interface {
consume(*MessageView) ConsumerResult
}
@@ -55,23 +80,23 @@ func (l *FuncMessageListener) consume(msg *MessageView)
ConsumerResult {
var _ = MessageListener(&FuncMessageListener{})
type pushConsumerOptions struct {
- subscriptionExpressions *sync.Map
- awaitDuration time.Duration
- maxCacheMessageCount int32
- maxCacheMessageSizeInBytes int64
- consumptionThreadCount int32
- messageListener MessageListener
- clientFunc NewClientFunc
- enableFifoConsumeAccelerator bool
+ subscriptionExpressions *sync.Map
+ awaitDuration time.Duration
+ maxCacheMessageCount int32
+ maxCacheMessageSizeInBytes int64
+ consumptionThreadCount int32
+ messageListener MessageListener
+ clientFunc NewClientFunc
+ enableFifoConsumeAccelerator bool
}
var defaultPushConsumerOptions = pushConsumerOptions{
- clientFunc: NewClient,
- awaitDuration: 0,
- maxCacheMessageCount: 1024,
- maxCacheMessageSizeInBytes: 64 * 1024 * 1024,
- consumptionThreadCount: 20,
- enableFifoConsumeAccelerator: false,
+ clientFunc: NewClient,
+ awaitDuration: 0,
+ maxCacheMessageCount: 1024,
+ maxCacheMessageSizeInBytes: 64 * 1024 * 1024,
+ consumptionThreadCount: 20,
+ enableFifoConsumeAccelerator: false,
}
// A ConsumerOption sets options such as tag, etc.
diff --git a/golang/push_consumer_test.go b/golang/push_consumer_test.go
index 45b0ddbd..85d43037 100644
--- a/golang/push_consumer_test.go
+++ b/golang/push_consumer_test.go
@@ -233,3 +233,67 @@ func TestDefaultLitePushConsumer_Wraps(t *testing.T) {
t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v",
hb.GetClientType())
}
}
+
+// TestNewConsumerResultSuspend tests the NewConsumerResultSuspend function
+func TestNewConsumerResultSuspend(t *testing.T) {
+ tests := []struct {
+ name string
+ suspendTime time.Duration
+ wantType ConsumerResultType
+ wantPanic bool
+ }{
+ {
+ name: "valid suspend time - exactly
minSuspendTime",
+ suspendTime: minSuspendTime,
+ wantType: ConsumerResultTypeSuspend,
+ wantPanic: false,
+ },
+ {
+ name: "valid suspend time - 1 second",
+ suspendTime: 1 * time.Second,
+ wantType: ConsumerResultTypeSuspend,
+ wantPanic: false,
+ },
+ {
+ name: "invalid suspend time - less than
minSuspendTime",
+ suspendTime: 10 * time.Millisecond,
+ wantType: ConsumerResultTypeSuspend,
+ wantPanic: true,
+ },
+ {
+ name: "invalid suspend time - negative",
+ suspendTime: -1 * time.Millisecond,
+ wantType: ConsumerResultTypeSuspend,
+ wantPanic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.wantPanic {
+ defer func() {
+ if r := recover(); r == nil {
+
t.Errorf("NewConsumerResultSuspend() should have panicked for suspendTime=%v",
tt.suspendTime)
+ }
+ }()
+ NewConsumerResultSuspend(tt.suspendTime)
+ } else {
+ result :=
NewConsumerResultSuspend(tt.suspendTime)
+ if result.Type != tt.wantType {
+ t.Errorf("NewConsumerResultSuspend()
Type = %v, want %v", result.Type, tt.wantType)
+ }
+ }
+ })
+ }
+}
+
+// TestPredefinedConsumerResults tests the predefined SUCCESS and FAILURE
constants
+func TestPredefinedConsumerResults(t *testing.T) {
+ if SUCCESS.Type != ConsumerResultTypeSuccess {
+ t.Errorf("SUCCESS.Type = %v, want %v", SUCCESS.Type,
ConsumerResultTypeSuccess)
+ }
+
+ if FAILURE.Type != ConsumerResultTypeFailure {
+ t.Errorf("FAILURE.Type = %v, want %v", FAILURE.Type,
ConsumerResultTypeFailure)
+ }
+}