This is an automated email from the ASF dual-hosted git repository.

shreemaanabhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new a0552823c fix: use shdict instead of events module for nodes data 
exchange (#13066)
a0552823c is described below

commit a0552823c05d9f31707e34ce50fa4b9ba5792a8c
Author: Shreemaan Abhishek <[email protected]>
AuthorDate: Fri Mar 13 15:08:00 2026 +0545

    fix: use shdict instead of events module for nodes data exchange (#13066)
---
 apisix/cli/ops.lua                 |  10 ++
 apisix/discovery/consul/init.lua   | 181 ++++++++++++++++++++++++-------------
 apisix/discovery/consul/schema.lua |   5 +
 t/APISIX.pm                        |   1 +
 t/discovery/consul_dump.t          |   2 +-
 5 files changed, 137 insertions(+), 62 deletions(-)

diff --git a/apisix/cli/ops.lua b/apisix/cli/ops.lua
index f5e9beb17..c7435ee08 100644
--- a/apisix/cli/ops.lua
+++ b/apisix/cli/ops.lua
@@ -767,6 +767,16 @@ Please modify "admin_key" in conf/config.yaml .
 
     end
 
+    -- inject consul discovery shared dict
+    if enabled_discoveries["consul"] then
+        if not sys_conf["discovery_shared_dicts"] then
+            sys_conf["discovery_shared_dicts"] = {}
+        end
+
+        local consul_conf = yaml_conf.discovery["consul"]
+        sys_conf["discovery_shared_dicts"]["consul"] = consul_conf.shared_size 
or "10m"
+    end
+
     -- fix up lua path
     sys_conf["extra_lua_path"] = get_lua_path(yaml_conf.apisix.extra_lua_path)
     sys_conf["extra_lua_cpath"] = 
get_lua_path(yaml_conf.apisix.extra_lua_cpath)
diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua
index 4d3c0e46b..c27330812 100644
--- a/apisix/discovery/consul/init.lua
+++ b/apisix/discovery/consul/init.lua
@@ -31,6 +31,7 @@ local ngx_timer_at       = ngx.timer.at
 local ngx_timer_every    = ngx.timer.every
 local log                = core.log
 local json_delay_encode  = core.json.delay_encode
+local process            = require("ngx.process")
 local ngx_worker_id      = ngx.worker.id
 local exiting            = ngx.worker.exiting
 local thread_spawn       = ngx.thread.spawn
@@ -42,16 +43,28 @@ local null               = ngx.null
 local type               = type
 local next               = next
 
-local all_services = core.table.new(0, 5)
+local consul_dict = ngx.shared.consul
+if not consul_dict then
+    error("lua_shared_dict \"consul\" not configured")
+end
+
 local default_service
 local default_weight
 local sort_type
 local skip_service_map = core.table.new(0, 1)
 local dump_params
 
-local events
-local events_list
 local consul_services
+-- Per-worker LRU cache: avoids shared dict access on every request.
+-- neg_ttl caches unknown services. invalid_stale ensures expired
+-- entries are refreshed from the shared dict instead of re-cached.
+local nodes_cache = core.lrucache.new({
+    ttl = 1,
+    count = 1024,
+    invalid_stale = true,
+    neg_ttl = 1,
+    neg_count = 64,
+})
 
 local default_skip_services = {"consul"}
 local default_random_range = 5
@@ -66,53 +79,94 @@ local _M = {
 }
 
 
-local function discovery_consul_callback(data, event, source, pid)
-    all_services = data
-    log.notice("update local variable all_services, event is: ", event,
-        "source: ", source, "server pid:", pid,
-        ", all services: ", json_delay_encode(all_services, true))
-end
+local function fetch_node_from_shdict(service_name)
+    local value = consul_dict:get(service_name)
+    if not value then
+        return nil, "consul service not found: " .. service_name
+    end
 
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        return nil, "failed to decode nodes for service: "
+                    .. service_name .. ", error: " .. (err or "")
+    end
 
-function _M.all_nodes()
-    return all_services
+    return nodes
 end
 
 
-function _M.nodes(service_name)
-    if not all_services then
-        log.error("all_services is nil, failed to fetch nodes for : ", 
service_name)
-        return
+function _M.all_nodes()
+    local keys = consul_dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    for i, key in ipairs(keys) do
+        local value = consul_dict:get(key)
+        if value then
+            local nodes, err = core.json.decode(value)
+            if nodes then
+                services[key] = nodes
+            else
+                log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+            end
+        end
+
+        if i % 100 == 0 then
+            ngx.sleep(0)
+        end
     end
+    return services
+end
 
-    local resp_list = all_services[service_name]
 
-    if not resp_list then
-        log.error("fetch nodes failed by ", service_name, ", return default 
service")
+function _M.nodes(service_name)
+    local nodes, err = nodes_cache(service_name, nil,
+                                   fetch_node_from_shdict, service_name)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
         return default_service and {default_service}
     end
 
-    log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, 
"] = ",
-        json_delay_encode(resp_list, true))
+    log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+        json_delay_encode(nodes, true))
 
-    return resp_list
+    return nodes
 end
 
 
 local function update_all_services(consul_server_url, up_services)
-    -- clean old unused data
+    -- write new/updated values first so readers never see a missing service
+    local i = 0
+    for k, v in pairs(up_services) do
+        local content, err = core.json.encode(v)
+        if content then
+            local ok, set_err, forcible = consul_dict:set(k, content)
+            if not ok then
+                log.error("failed to set nodes for service: ", k, ", error: ", 
set_err,
+                          ", please consider increasing lua_shared_dict consul 
size")
+            elseif forcible then
+                log.warn("consul shared dict is full, forcibly evicting items 
while ",
+                         "setting nodes for service: ", k,
+                         ", please consider increasing lua_shared_dict consul 
size")
+            end
+        else
+            log.error("failed to encode nodes for service: ", k, ", error: ", 
err)
+        end
+        i = i + 1
+        if i % 100 == 0 then
+            ngx.sleep(0)
+        end
+    end
+
+    -- then delete keys that are no longer present
     local old_services = consul_services[consul_server_url] or {}
     for k, _ in pairs(old_services) do
-        all_services[k] = nil
+        if not up_services[k] then
+            consul_dict:delete(k)
+        end
     end
-    core.table.clear(old_services)
 
-    for k, v in pairs(up_services) do
-        all_services[k] = v
-    end
     consul_services[consul_server_url] = up_services
 
-    log.info("update all services: ", json_delay_encode(all_services, true))
+    log.info("update all services to shared dict")
 end
 
 
@@ -149,14 +203,30 @@ local function read_dump_services()
         return
     end
 
-    all_services = entity.services
-    log.info("load dump file into memory success")
+    for k, v in pairs(entity.services) do
+        local content, json_err = core.json.encode(v)
+        if content then
+            consul_dict:set(k, content)
+        else
+            log.error("failed to encode dump service: ", k, ", error: ", 
json_err)
+        end
+    end
+    log.info("load dump file into shared dict success")
 end
 
 
 local function write_dump_services()
+    -- build services from the privileged agent's in-memory tracking table
+    -- to avoid a full shared dict scan + JSON decode via _M.all_nodes()
+    local services = core.table.new(0, 8)
+    for _, svcs in pairs(consul_services) do
+        for k, v in pairs(svcs) do
+            services[k] = v
+        end
+    end
+
     local entity = {
-        services = all_services,
+        services = services,
         last_update = ngx.time(),
         expire = dump_params.expire, -- later need handle it
     }
@@ -556,14 +626,6 @@ function _M.connect(premature, consul_server, retry_delay)
 
         update_all_services(consul_server.consul_server_url, up_services)
 
-        --update events
-        local post_ok, post_err = events:post(events_list._source,
-                events_list.updating, all_services)
-        if not post_ok then
-            log.error("post_event failure with ", events_list._source,
-                ", update all services error: ", post_err)
-        end
-
         if dump_params then
             ngx_timer_at(0, write_dump_services)
         end
@@ -611,28 +673,8 @@ end
 
 function _M.init_worker()
     local consul_conf = local_conf.discovery.consul
+    dump_params = consul_conf.dump
 
-    if consul_conf.dump then
-        local dump = consul_conf.dump
-        dump_params = dump
-
-        if dump.load_on_init then
-            read_dump_services()
-        end
-    end
-
-    events = require("apisix.events")
-    events_list = events:event_list(
-            "discovery_consul_update_all_services",
-            "updating"
-    )
-
-    if 0 ~= ngx_worker_id() then
-        events:register(discovery_consul_callback, events_list._source, 
events_list.updating)
-        return
-    end
-
-    log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
     default_weight = consul_conf.weight
     sort_type = consul_conf.sort_type
     -- set default service, used when the server node cannot be found
@@ -640,6 +682,23 @@ function _M.init_worker()
         default_service = consul_conf.default_service
         default_service.weight = default_weight
     end
+
+    if process.type() ~= "privileged agent" then
+        return
+    end
+
+    -- flush stale data that may persist across reloads,
+    -- since consul_services is re-initialized empty
+    consul_dict:flush_all()
+
+    if consul_conf.dump then
+        if consul_conf.dump.load_on_init then
+            read_dump_services()
+        end
+    end
+
+    log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
+
     if consul_conf.skip_services then
         skip_service_map = core.table.new(0, #consul_conf.skip_services)
         for _, v in ipairs(consul_conf.skip_services) do
@@ -673,7 +732,7 @@ end
 
 
 function _M.dump_data()
-    return {config = local_conf.discovery.consul, services = all_services }
+    return {config = local_conf.discovery.consul, services = _M.all_nodes()}
 end
 
 
diff --git a/apisix/discovery/consul/schema.lua 
b/apisix/discovery/consul/schema.lua
index 5d6fc641e..06fa9cabf 100644
--- a/apisix/discovery/consul/schema.lua
+++ b/apisix/discovery/consul/schema.lua
@@ -24,6 +24,11 @@ return {
                 type = "string",
             }
         },
+        shared_size = {
+            type = "string",
+            pattern = [[^[1-9][0-9]*m$]],
+            default = "1m",
+        },
         token = {type = "string", default = ""},
         fetch_interval = {type = "integer", minimum = 1, default = 3},
         keepalive = {
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 5aa54a1af..ac0edef49 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -294,6 +294,7 @@ lua {
     lua_shared_dict standalone-config 10m;
     lua_shared_dict status-report 1m;
     lua_shared_dict nacos 10m;
+    lua_shared_dict consul 10m;
     lua_shared_dict upstream-healthcheck 10m;
 }
 _EOC_
diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t
index 9cb24a3c1..030ddf2f4 100644
--- a/t/discovery/consul_dump.t
+++ b/t/discovery/consul_dump.t
@@ -230,7 +230,7 @@ GET /hello
 --- error_code: 503
 --- error_log
 connect consul
-fetch nodes failed
+consul service not found
 failed to set upstream
 
 

Reply via email to