AlexStocks commented on code in PR #892:
URL: https://github.com/apache/dubbo-go-pixiu/pull/892#discussion_r2934588042


##########
pkg/hotreload/http_handler.go:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hotreload
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "os"
+       "sync"
+       "time"
+)
+
+import (
+       "gopkg.in/yaml.v3"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/config"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+var (
+       reloadMutex sync.Mutex
+       configPath  string
+)
+
+func SetConfigPath(path string) {
+       configPath = path
+}
+
+// ReloadHandler handles HTTP reload requests
+type ReloadHandler struct{}
+
+// ServeHTTP handles the reload HTTP request
+func (h *ReloadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       if r.Method != http.MethodPost {
+               http.Error(w, "Method not allowed, use POST", 
http.StatusMethodNotAllowed)
+               return
+       }
+
+       logger.Info("Received reload request via HTTP")
+
+       var err error
+       if r.ContentLength > 0 {
+               err = triggerConfigReloadFromBody(r)
+       } else {
+               err = triggerConfigReload()
+       }
+
+       if err != nil {
+               logger.Errorf("Reload failed: %v", err)
+               http.Error(w, fmt.Sprintf("Reload failed: %v", err), 
http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       writeJSONResponse(w, "success", "Configuration reloaded successfully")
+       logger.Info("Reload completed successfully")
+}
+
+func writeJSONResponse(w http.ResponseWriter, status, message string) {
+       response := map[string]string{
+               "status":  status,
+               "message": message,
+               "time":    time.Now().Format(time.RFC3339),
+       }
+       if err := json.NewEncoder(w).Encode(response); err != nil {
+               logger.Errorf("Failed to encode response: %v", err)
+       }
+}
+
+// HealthHandler handles health check requests
+type HealthHandler struct{}
+
+// ServeHTTP handles the health check HTTP request
+func (h *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       writeJSONResponse(w, "healthy", "")
+}
+
+// triggerConfigReload reloads configuration from file
+func triggerConfigReload() error {
+       reloadMutex.Lock()
+       defer reloadMutex.Unlock()
+
+       if configPath == "" {
+               return fmt.Errorf("config path not set")
+       }
+
+       logger.Infof("Reloading configuration from: %s", configPath)
+
+       content, err := os.ReadFile(configPath)
+       if err != nil {
+               return fmt.Errorf("failed to read config file: %w", err)
+       }
+
+       return reloadFromYAML(content)
+}
+
+// triggerConfigReloadFromBody reloads configuration from HTTP request body
+func triggerConfigReloadFromBody(r *http.Request) error {
+       reloadMutex.Lock()
+       defer reloadMutex.Unlock()
+
+       logger.Info("Reloading configuration from request body")
+
+       content, err := io.ReadAll(r.Body)
+       if err != nil {
+               return fmt.Errorf("failed to read request body: %w", err)
+       }
+       defer func() {
+               if err := r.Body.Close(); err != nil {
+                       logger.Errorf("Failed to close request body: %v", err)
+               }
+       }()
+
+       return reloadFromYAML(content)
+}
+
+// reloadFromYAML performs the actual reload from YAML content
+func reloadFromYAML(content []byte) error {
+       newConfig := &model.Bootstrap{}
+       if err := yaml.Unmarshal(content, newConfig); err != nil {
+               return fmt.Errorf("failed to parse config: %w", err)
+       }
+
+       if err := config.Adapter(newConfig); err != nil {
+               return fmt.Errorf("failed to adapt config: %w", err)
+       }
+
+       oldConfig := config.GetBootstrap()
+       if oldConfig == nil {
+               return fmt.Errorf("current config is nil")
+       }
+
+       logger.Infof("Old config has %d listeners, new config has %d listeners",
+               len(oldConfig.StaticResources.Listeners), 
len(newConfig.StaticResources.Listeners))
+
+       wg := &sync.WaitGroup{}

Review Comment:
   各 reloader 被并行执行,但 `RouteReloader.HotReload` 调用了 `ClearRouterListeners()` + 
`UpdateListener()`,`ClusterReloader.HotReload` 也在同时操作 server 
内部状态,这些操作之间没有任何同步机制,存在并发竞争。
   
   另外 `reloadFromYAML` 跳过了 `CheckUpdate` 检查,即使配置没变也会执行 reload,和 `hotreload.go` 
里 `Coordinator.hotReload` 的逻辑不一致。
   
   建议:(1) 不要并行执行 reloaders,改为串行,或直接复用已有的 `coordinator.hotReload(newConfig)` 
方法;(2) 补上 `CheckUpdate` 检查。



##########
pkg/common/router/router.go:
##########
@@ -65,7 +75,23 @@ func CreateRouterCoordinator(routeConfig 
*model.RouteConfiguration) *RouterCoord
        return rc
 }
 
+func (rm *RouterCoordinator) Close() {
+       if rm.dynamic {
+               routerMgr := server.GetRouterManager()
+               if routerMgr != nil {
+                       routerMgr.RemoveRouterListener(rm)
+               }
+       }
+}
+
 func (rm *RouterCoordinator) Route(hc *http.HttpContext) (*model.RouteAction, 
error) {
+       if rm.needsRegistration && rm.dynamic {

Review Comment:
   `needsRegistration` 在热路径(每个请求都会走的 `Route` 方法)中被无锁读写。多个 goroutine 
同时处理请求时,可能多次调用 `AddRouterListener(rm)` 导致重复注册。
   
   `RouterManager.AddRouterListener` 只是 `append`,没有去重逻辑,重复注册会导致路由更新时同一个 
coordinator 被通知多次。
   
   建议用 `sync.Once` 或 `atomic.Bool` 保证只注册一次。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to