This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new dd8e60fc golang: some optimizations regarding transaction messages and
pushconsumer (#1021)
dd8e60fc is described below
commit dd8e60fc2a83104a1c2ce158f2034d5e827963ae
Author: guyinyou <[email protected]>
AuthorDate: Mon Jun 23 14:33:48 2025 +0800
golang: some optimizations regarding transaction messages and pushconsumer
(#1021)
Co-authored-by: guyinyou <[email protected]>
---
golang/consumer_service.go | 6 ++++++
golang/producer.go | 6 ------
golang/push_consumer.go | 1 +
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/golang/consumer_service.go b/golang/consumer_service.go
index 846bd04c..4bb42959 100644
--- a/golang/consumer_service.go
+++ b/golang/consumer_service.go
@@ -25,6 +25,7 @@ import (
type ConsumeService interface {
consume(ProcessQueue, []*MessageView)
consumeWithDuration(*MessageView, time.Duration, func(ConsumerResult,
error))
+ Shutdown() error
}
type baseConsumeService struct {
@@ -55,6 +56,11 @@ func (bcs *baseConsumeService)
consumeWithDuration(messageView *MessageView, dur
time.AfterFunc(duration, func() { bcs.consumptionExecutor.Submit(task)
})
}
+func (bcs *baseConsumeService) Shutdown() error {
+ bcs.consumptionExecutor.Shutdown()
+ return nil
+}
+
func (bcs *baseConsumeService) newConsumeTask(clientId string, messageListener
MessageListener, messageView *MessageView, messageInterceptor
MessageInterceptor, callback func(ConsumerResult, error)) func() {
return func() {
consumeResult := FAILURE
diff --git a/golang/producer.go b/golang/producer.go
index 58d4a226..b30dc5b7 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -235,12 +235,6 @@ func (p *defaultProducer) send1(ctx context.Context, topic
string, messageType v
topic, messageIds, maxAttempts, attempt,
endpoints, utils.GetRequestID(ctx))
return nil, err
}
- // No need more attempts for transactional message.
- if messageType == v2.MessageType_TRANSACTION {
- p.cli.log.Errorf("failed to send transactional message
finally, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%s,
requestId=%s",
- topic, messageIds, maxAttempts, attempt,
endpoints, utils.GetRequestID(ctx))
- return nil, err
- }
// Try to do more attempts.
nextAttempt := attempt + 1
// Retry immediately if the request is not throttled.
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 75a9c26e..f6a446a7 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -499,6 +499,7 @@ func (pc *defaultPushConsumer) dropProcessQueue(mqstr
utils.MessageQueueStr) {
}
func (pc *defaultPushConsumer) GracefulStop() error {
+ pc.consumerService.Shutdown()
return pc.cli.GracefulStop()
}