AlexStocks commented on code in PR #3235:
URL: https://github.com/apache/dubbo-go/pull/3235#discussion_r2969087457
##########
filter/graceful_shutdown/consumer_filter.go:
##########
@@ -91,3 +129,107 @@ func (f *consumerGracefulShutdownFilter) Set(name string,
conf any) {
// do nothing
}
}
+
+// isClosingInvoker checks if invoker is in closing list
+func (f *consumerGracefulShutdownFilter) isClosingInvoker(invoker
base.Invoker) bool {
+ key := invoker.GetURL().String()
+ if expireTime, ok := f.closingInvokers.Load(key); ok {
+ if time.Now().Before(expireTime.(time.Time)) {
+ return true
+ }
+ f.closingInvokers.Delete(key)
+ if setter, ok := invoker.(base.AvailabilitySetter); ok {
+ setter.SetAvailable(true)
+ logger.Infof("Graceful shutdown --- Recovered invoker
availability after closing TTL --- %s", key)
+ }
+ }
+ return false
+}
+
+// isClosingResponse checks if response contains closing flag
+func (f *consumerGracefulShutdownFilter) isClosingResponse(result
result.Result) bool {
+ if result != nil && result.Attachments() != nil {
+ if v, ok :=
result.Attachments()[constant.GracefulShutdownClosingKey]; ok {
+ if v == "true" {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+// markClosingInvoker marks invoker as closing and sets available=false
+func (f *consumerGracefulShutdownFilter) markClosingInvoker(invoker
base.Invoker) {
+ key := invoker.GetURL().String()
+ expireTime := time.Now().Add(f.getClosingInvokerExpireTime())
+ f.closingInvokers.Store(key, expireTime)
+
+ logger.Infof("Graceful shutdown --- Marked invoker as closing --- %s,
will expire at %v, IsAvailable=%v",
+ key, expireTime, invoker.IsAvailable())
+
+ if setter, ok := invoker.(base.AvailabilitySetter); ok {
+ setter.SetAvailable(false)
+ logger.Infof("Graceful shutdown --- Set invoker unavailable ---
%s, IsAvailable now=%v",
+ key, invoker.IsAvailable())
+ }
+}
+
+func (f *consumerGracefulShutdownFilter) handleClosingEvent(invoker
base.Invoker, source string) {
+ if f.closingEventHandler == nil || invoker == nil || invoker.GetURL()
== nil {
+ return
+ }
+
+ f.closingEventHandler.HandleClosingEvent(gracefulshutdown.ClosingEvent{
+ Source: source,
+ InstanceKey: invoker.GetURL().GetCacheInvokerMapKey(),
+ ServiceKey: invoker.GetURL().ServiceKey(),
+ Address: invoker.GetURL().Location,
+ })
+}
+
+func (f *consumerGracefulShutdownFilter) getClosingInvokerExpireTime()
time.Duration {
+ if f.shutdownConfig != nil && f.shutdownConfig.ClosingInvokerExpireTime
!= "" {
+ if duration, err :=
time.ParseDuration(f.shutdownConfig.ClosingInvokerExpireTime); err == nil &&
duration > 0 {
+ return duration
+ }
+ }
+ return 30 * time.Second
+}
+
+// handleRequestError handles request errors and marks invoker as unavailable
for connection errors
+func (f *consumerGracefulShutdownFilter) handleRequestError(invoker
base.Invoker, err error) {
+ if err == nil {
+ return
+ }
+
+ if isClosingError(err) {
+ f.markClosingInvoker(invoker)
+ f.handleClosingEvent(invoker, "connection-closing-error")
+ }
+}
+
+func isClosingError(err error) bool {
+ if errors.Is(err, base.ErrClientClosed) || errors.Is(err,
base.ErrDestroyedInvoker) {
+ return true
+ }
+
Review Comment:
## consumer_filter.go:174-181 - 错误检测依赖字符串匹配过于脆弱
strings.Contains(errMsg, ...) 使用字符串匹配检测关闭错误是脆弱的,不同 gRPC 版本可能返回不同的错误信息。
**建议**: 考虑使用 google.golang.org/grpc/status 的 FromError() 解析 error,然后检查
status.Code() 是否为 Unavailable 或 Canceled。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]