This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 1381f0af golang: add SetRequestTimeout implements
producer/simpleConsumer/pushConsumer (#1016)
1381f0af is described below
commit 1381f0af882c9725b98f7f47b863c623725d6f42
Author: guyinyou <[email protected]>
AuthorDate: Thu Jun 12 19:44:03 2025 +0800
golang: add SetRequestTimeout implements
producer/simpleConsumer/pushConsumer (#1016)
Co-authored-by: guyinyou <[email protected]>
---
golang/client.go | 1 +
golang/producer.go | 5 +++++
golang/push_consumer.go | 5 +++++
golang/simple_consumer.go | 5 +++++
4 files changed, 16 insertions(+)
diff --git a/golang/client.go b/golang/client.go
index 2d92b161..eb7bed4f 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -46,6 +46,7 @@ type Client interface {
type isClient interface {
isClient()
+ SetRequestTimeout(timeout time.Duration)
wrapHeartbeatRequest() *v2.HeartbeatRequest
onRecoverOrphanedTransactionCommand(endpoints *v2.Endpoints, command
*v2.RecoverOrphanedTransactionCommand) error
onVerifyMessageCommand(endpoints *v2.Endpoints, command
*v2.VerifyMessageCommand) error
diff --git a/golang/producer.go b/golang/producer.go
index 5f0a355d..58d4a226 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -455,3 +455,8 @@ func (p *defaultProducer)
onRecoverOrphanedTransactionCommand(endpoints *v2.Endp
func (p *defaultProducer) onVerifyMessageCommand(endpoints *v2.Endpoints,
command *v2.VerifyMessageCommand) error {
return nil
}
+
+func (p *defaultProducer) SetRequestTimeout(timeout time.Duration) {
+ p.cli.opts.timeout = timeout
+ p.pSetting.requestTimeout = p.cli.opts.timeout
+}
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index f0e9f13d..75a9c26e 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -70,6 +70,11 @@ type defaultPushConsumer struct {
consumptionErrorQuantity atomic.Int64
}
+func (pc *defaultPushConsumer) SetRequestTimeout(timeout time.Duration) {
+ pc.cli.opts.timeout = timeout
+ pc.pcSettings.requestTimeout = pc.cli.opts.timeout
+}
+
func (pc *defaultPushConsumer) isOn() bool {
return pc.cli.on.Load()
}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index abb41936..4db351e9 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -61,6 +61,11 @@ type defaultSimpleConsumer struct {
subTopicRouteDataResultCache sync.Map
}
+func (sc *defaultSimpleConsumer) SetRequestTimeout(timeout time.Duration) {
+ sc.cli.opts.timeout = timeout
+ sc.scSettings.requestTimeout = sc.cli.opts.timeout
+}
+
func (sc *defaultSimpleConsumer) isOn() bool {
return sc.cli.on.Load()
}