This is an automated email from the ASF dual-hosted git repository.
ashishtiwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 09f6e3682 refactor: add healthcheck manager to decouple upstream
(#12426)
09f6e3682 is described below
commit 09f6e3682c5da1437f951273200d44cc18b13a28
Author: Ashish Tiwari <[email protected]>
AuthorDate: Thu Aug 7 18:50:18 2025 +0530
refactor: add healthcheck manager to decouple upstream (#12426)
---
apisix/balancer.lua | 5 +-
apisix/control/v1.lua | 16 +-
apisix/healthcheck_manager.lua | 323 ++++++++++++++++++++++++
apisix/init.lua | 8 +-
apisix/plugin.lua | 9 +-
apisix/plugins/ai.lua | 4 +-
apisix/upstream.lua | 147 +----------
t/control/healthcheck.t | 16 +-
t/control/routes.t | 8 +-
t/control/services.t | 12 +-
t/discovery/reset-healthchecker.t | 12 +-
t/node/healthcheck-discovery.t | 15 +-
t/node/healthcheck-https.t | 8 +-
t/node/healthcheck-ipv6.t | 2 +-
t/node/healthcheck-leak-bugfix.t | 4 +-
t/node/healthcheck-passive-resty-events.t | 20 +-
t/node/healthcheck-passive.t | 35 ++-
t/node/healthcheck-stop-checker.t | 25 +-
t/node/healthcheck.t | 7 +-
t/node/healthcheck2.t | 34 ++-
t/node/healthcheck3.t | 2 +-
t/node/priority-balancer/health-checker.t | 5 +
t/node/rr-balance.t | 15 +-
t/stream-node/healthcheck-resty-events.t | 8 +-
t/stream-node/healthcheck-resty-worker-events.t | 14 +-
25 files changed, 530 insertions(+), 224 deletions(-)
diff --git a/apisix/balancer.lua b/apisix/balancer.lua
index 0fe2e6539..6e3675e54 100644
--- a/apisix/balancer.lua
+++ b/apisix/balancer.lua
@@ -19,6 +19,7 @@ local balancer = require("ngx.balancer")
local core = require("apisix.core")
local priority_balancer = require("apisix.balancer.priority")
local apisix_upstream = require("apisix.upstream")
+local healthcheck_manager = require("apisix.healthcheck_manager")
local ipairs = ipairs
local is_http = ngx.config.subsystem == "http"
local enable_keepalive = balancer.enable_keepalive and is_http
@@ -28,7 +29,6 @@ local set_timeouts = balancer.set_timeouts
local ngx_now = ngx.now
local str_byte = string.byte
-
local module_name = "balancer"
local pickers = {}
@@ -75,7 +75,8 @@ local function fetch_health_nodes(upstream, checker)
local port = upstream.checks and upstream.checks.active and
upstream.checks.active.port
local up_nodes = core.table.new(0, #nodes)
for _, node in ipairs(nodes) do
- local ok, err = checker:get_target_status(node.host, port or
node.port, host)
+ local ok, err = healthcheck_manager.fetch_node_status(checker,
+ node.host, port or node.port,
host)
if ok then
up_nodes = transform_node(up_nodes, node)
elseif err then
diff --git a/apisix/control/v1.lua b/apisix/control/v1.lua
index 4d35018b8..f457eac0d 100644
--- a/apisix/control/v1.lua
+++ b/apisix/control/v1.lua
@@ -20,6 +20,7 @@ local plugin = require("apisix.plugin")
local get_routes = require("apisix.router").http_routes
local get_services = require("apisix.http.service").services
local upstream_mod = require("apisix.upstream")
+local healthcheck_manager = require("apisix.healthcheck_manager")
local get_upstreams = upstream_mod.upstreams
local collectgarbage = collectgarbage
local ipairs = ipairs
@@ -66,14 +67,13 @@ function _M.schema()
return 200, schema
end
-
local healthcheck
local function extra_checker_info(value)
if not healthcheck then
healthcheck = require("resty.healthcheck")
end
- local name = upstream_mod.get_healthchecker_name(value)
+ local name = healthcheck_manager.get_healthchecker_name(value.value)
local nodes, err = healthcheck.get_target_list(name,
"upstream-healthcheck")
if err then
core.log.error("healthcheck.get_target_list failed: ", err)
@@ -214,7 +214,6 @@ local function iter_and_find_healthcheck_info(values,
src_type, src_id)
if not checks then
return nil, str_format("no checker for %s[%s]", src_type,
src_id)
end
-
local info = extra_checker_info(value)
info.type = get_checker_type(checks)
return info
@@ -249,7 +248,6 @@ function _M.get_health_checker()
if not info then
return 404, {error_msg = err}
end
-
local out, err = try_render_html({stats={info}})
if out then
core.response.set_header("Content-Type", "text/html")
@@ -266,9 +264,6 @@ local function iter_add_get_routes_info(values, route_id)
local infos = {}
for _, route in core.config_util.iterate_values(values) do
local new_route = core.table.deepcopy(route)
- if new_route.value.upstream and new_route.value.upstream.parent then
- new_route.value.upstream.parent = nil
- end
-- remove healthcheck info
new_route.checker = nil
new_route.checker_idx = nil
@@ -312,9 +307,6 @@ local function iter_add_get_upstream_info(values,
upstream_id)
for _, upstream in core.config_util.iterate_values(values) do
local new_upstream = core.table.deepcopy(upstream)
core.table.insert(infos, new_upstream)
- if new_upstream.value and new_upstream.value.parent then
- new_upstream.value.parent = nil
- end
-- check the upstream id
if upstream_id and upstream.value.id == upstream_id then
return new_upstream
@@ -332,6 +324,7 @@ function _M.dump_all_upstreams_info()
return 200, infos
end
+
function _M.dump_upstream_info()
local upstreams = get_upstreams()
local uri_segs = core.utils.split_uri(ngx_var.uri)
@@ -354,9 +347,6 @@ local function iter_add_get_services_info(values, svc_id)
local infos = {}
for _, svc in core.config_util.iterate_values(values) do
local new_svc = core.table.deepcopy(svc)
- if new_svc.value.upstream and new_svc.value.upstream.parent then
- new_svc.value.upstream.parent = nil
- end
-- remove healthcheck info
new_svc.checker = nil
new_svc.checker_idx = nil
diff --git a/apisix/healthcheck_manager.lua b/apisix/healthcheck_manager.lua
new file mode 100644
index 000000000..066349829
--- /dev/null
+++ b/apisix/healthcheck_manager.lua
@@ -0,0 +1,323 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require = require
+local ipairs = ipairs
+local pcall = pcall
+local exiting = ngx.worker.exiting
+local pairs = pairs
+local tostring = tostring
+local core = require("apisix.core")
+local config_local = require("apisix.core.config_local")
+local healthcheck
+local events = require("apisix.events")
+local tab_clone = core.table.clone
+local timer_every = ngx.timer.every
+local string_sub = string.sub
+
+local _M = {}
+local working_pool = {} -- resource_path -> {version = ver, checker =
checker}
+local waiting_pool = {} -- resource_path -> resource_ver
+
+local DELAYED_CLEAR_TIMEOUT = 10
+local healthcheck_shdict_name = "upstream-healthcheck"
+local is_http = ngx.config.subsystem == "http"
+if not is_http then
+ healthcheck_shdict_name = healthcheck_shdict_name .. "-" ..
ngx.config.subsystem
+end
+
+
+local function get_healthchecker_name(value)
+ return "upstream#" .. (value.resource_key or value.upstream.resource_key)
+end
+_M.get_healthchecker_name = get_healthchecker_name
+
+
+local function remove_etcd_prefix(key)
+ local prefix = ""
+ local local_conf = config_local.local_conf()
+ local role = core.table.try_read_attr(local_conf, "deployment", "role")
+ local provider = core.table.try_read_attr(local_conf, "deployment",
"role_" ..
+ role, "config_provider")
+ if provider == "etcd" and local_conf.etcd and local_conf.etcd.prefix then
+ prefix = local_conf.etcd.prefix
+ end
+ return string_sub(key, #prefix + 1)
+end
+
+
+local function fetch_latest_conf(resource_path)
+ local resource_type, id
+ -- Handle both formats:
+ -- 1. /<etcd-prefix>/<resource_type>/<id>
+ -- 2. /<resource_type>/<id>
+ resource_path = remove_etcd_prefix(resource_path)
+ resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$")
+ if not resource_type or not id then
+ core.log.error("invalid resource path: ", resource_path)
+ return nil
+ end
+
+ local key
+ if resource_type == "upstreams" then
+ key = "/upstreams"
+ elseif resource_type == "routes" then
+ key = "/routes"
+ elseif resource_type == "services" then
+ key = "/services"
+ elseif resource_type == "stream_routes" then
+ key = "/stream_routes"
+ else
+ core.log.error("unsupported resource type: ", resource_type)
+ return nil
+ end
+
+ local data = core.config.fetch_created_obj(key)
+ if not data then
+ core.log.error("failed to fetch configuration for type: ", key)
+ return nil
+ end
+ local resource = data:get(id)
+ if not resource then
+ -- this can happen if the resource was deleted
+ -- after the this function was called so we don't throw error
+ core.log.warn("resource not found: ", id, " in ", key,
+ "this can happen if the resource was deleted")
+ return nil
+ end
+
+ return resource
+end
+
+
+local function create_checker(up_conf)
+ if not up_conf.checks then
+ return nil
+ end
+ local local_conf = config_local.local_conf()
+ if local_conf and local_conf.apisix and
local_conf.apisix.disable_upstream_healthcheck then
+ core.log.info("healthchecker won't be created: disabled upstream
healthcheck")
+ return nil
+ end
+ core.log.info("creating healthchecker for upstream: ",
up_conf.resource_key)
+ if not healthcheck then
+ healthcheck = require("resty.healthcheck")
+ end
+
+ local checker, err = healthcheck.new({
+ name = get_healthchecker_name(up_conf),
+ shm_name = healthcheck_shdict_name,
+ checks = up_conf.checks,
+ events_module = events:get_healthcheck_events_modele(),
+ })
+
+ if not checker then
+ core.log.error("failed to create healthcheck: ", err)
+ return nil
+ end
+
+ -- Add target nodes
+ local host = up_conf.checks and up_conf.checks.active and
up_conf.checks.active.host
+ local port = up_conf.checks and up_conf.checks.active and
up_conf.checks.active.port
+ local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host
+ local use_node_hdr = up_conf.pass_host == "node" or nil
+
+ for _, node in ipairs(up_conf.nodes) do
+ local host_hdr = up_hdr or (use_node_hdr and node.domain)
+ local ok, err = checker:add_target(node.host, port or node.port, host,
+ true, host_hdr)
+ if not ok then
+ core.log.error("failed to add healthcheck target: ", node.host,
":",
+ port or node.port, " err: ", err)
+ end
+ end
+
+ return checker
+end
+
+
+function _M.fetch_checker(resource_path, resource_ver)
+ local working_item = working_pool[resource_path]
+ if working_item and working_item.version == resource_ver then
+ return working_item.checker
+ end
+
+ if waiting_pool[resource_path] == resource_ver then
+ return nil
+ end
+
+ -- Add to waiting pool with version
+ core.log.info("adding ", resource_path, " to waiting pool with version: ",
resource_ver)
+ waiting_pool[resource_path] = resource_ver
+ return nil
+end
+
+
+function _M.fetch_node_status(checker, ip, port, hostname)
+ -- check if the checker is valid
+ if not checker or checker.dead then
+ return true
+ end
+
+ return checker:get_target_status(ip, port, hostname)
+end
+
+
+local function add_working_pool(resource_path, resource_ver, checker)
+ working_pool[resource_path] = {
+ version = resource_ver,
+ checker = checker
+ }
+end
+
+local function find_in_working_pool(resource_path, resource_ver)
+ local checker = working_pool[resource_path]
+ if not checker then
+ return nil -- not found
+ end
+
+ if checker.version ~= resource_ver then
+ core.log.info("version mismatch for resource: ", resource_path,
+ " current version: ", checker.version, " requested
version: ", resource_ver)
+ return nil -- version not match
+ end
+ return checker
+end
+
+
+function _M.upstream_version(index, nodes_ver)
+ if not index then
+ return
+ end
+ return index .. tostring(nodes_ver or '')
+end
+
+
+local function timer_create_checker()
+ if core.table.nkeys(waiting_pool) == 0 then
+ return
+ end
+
+ local waiting_snapshot = tab_clone(waiting_pool)
+ for resource_path, resource_ver in pairs(waiting_snapshot) do
+ do
+ if find_in_working_pool(resource_path, resource_ver) then
+ core.log.info("resource: ", resource_path,
+ " already in working pool with version: ",
+ resource_ver)
+ goto continue
+ end
+ local res_conf = fetch_latest_conf(resource_path)
+ if not res_conf then
+ goto continue
+ end
+ local upstream = res_conf.value.upstream or res_conf.value
+ local new_version = _M.upstream_version(res_conf.modifiedIndex,
upstream._nodes_ver)
+ core.log.info("checking waiting pool for resource: ",
resource_path,
+ " current version: ", new_version, " requested version: ",
resource_ver)
+ if resource_ver ~= new_version then
+ goto continue
+ end
+
+ -- if a checker exists then delete it before creating a new one
+ local existing_checker = working_pool[resource_path]
+ if existing_checker then
+ existing_checker.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT)
+ existing_checker.checker:stop()
+ core.log.info("releasing existing checker: ",
tostring(existing_checker.checker),
+ " for resource: ", resource_path, " and version:
",
+ existing_checker.version)
+ end
+ local checker = create_checker(upstream)
+ if not checker then
+ goto continue
+ end
+ core.log.info("create new checker: ", tostring(checker), " for
resource: ",
+ resource_path, " and version: ", resource_ver)
+ add_working_pool(resource_path, resource_ver, checker)
+ end
+
+ ::continue::
+ waiting_pool[resource_path] = nil
+ end
+end
+
+
+local function timer_working_pool_check()
+ if core.table.nkeys(working_pool) == 0 then
+ return
+ end
+
+ local working_snapshot = tab_clone(working_pool)
+ for resource_path, item in pairs(working_snapshot) do
+ --- remove from working pool if resource doesn't exist
+ local res_conf = fetch_latest_conf(resource_path)
+ local need_destroy = true
+ if res_conf and res_conf.value then
+ local current_ver = _M.upstream_version(res_conf.modifiedIndex,
+ res_conf.value._nodes_ver)
+ core.log.info("checking working pool for resource: ",
resource_path,
+ " current version: ", current_ver, " item version: ",
item.version)
+ if item.version == current_ver then
+ need_destroy = false
+ end
+ end
+
+ if need_destroy then
+ working_pool[resource_path] = nil
+ item.checker.dead = true
+ item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT)
+ item.checker:stop()
+ core.log.info("try to release checker: ", tostring(item.checker),
" for resource: ",
+ resource_path, " and version : ", item.version)
+ end
+ end
+end
+
+function _M.init_worker()
+ local timer_create_checker_running = false
+ local timer_working_pool_check_running = false
+ timer_every(1, function ()
+ if not exiting() then
+ if timer_create_checker_running then
+ core.log.warn("timer_create_checker is already running,
skipping this iteration")
+ return
+ end
+ timer_create_checker_running = true
+ local ok, err = pcall(timer_create_checker)
+ if not ok then
+ core.log.error("failed to run timer_create_checker: ", err)
+ end
+ timer_create_checker_running = false
+ end
+ end)
+ timer_every(1, function ()
+ if not exiting() then
+ if timer_working_pool_check_running then
+ core.log.warn("timer_working_pool_check is already running
skipping iteration")
+ return
+ end
+ timer_working_pool_check_running = true
+ local ok, err = pcall(timer_working_pool_check)
+ if not ok then
+ core.log.error("failed to run timer_working_pool_check: ", err)
+ end
+ timer_working_pool_check_running = false
+ end
+ end)
+end
+
+return _M
diff --git a/apisix/init.lua b/apisix/init.lua
index f518f2028..d56fbc3e1 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -255,8 +255,12 @@ local function parse_domain_in_route(route)
-- don't modify the modifiedIndex to avoid plugin cache miss because of
DNS resolve result
-- has changed
- route.dns_value = core.table.deepcopy(route.value, { shallows = {
"self.upstream.parent"}})
+ route.dns_value = core.table.deepcopy(route.value)
route.dns_value.upstream.nodes = new_nodes
+ if not route.dns_value._nodes_ver then
+ route.dns_value._nodes_ver = 0
+ end
+ route.dns_value._nodes_ver = route.dns_value._nodes_ver + 1
core.log.info("parse route which contain domain: ",
core.json.delay_encode(route, true))
return route
@@ -842,7 +846,7 @@ local function healthcheck_passive(api_ctx)
end
local up_conf = api_ctx.upstream_conf
- local passive = up_conf.checks.passive
+ local passive = up_conf.checks and up_conf.checks.passive
if not passive then
return
end
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index de5421a93..87f024d66 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -583,7 +583,7 @@ end
local function merge_service_route(service_conf, route_conf)
- local new_conf = core.table.deepcopy(service_conf, { shallows =
{"self.value.upstream.parent"}})
+ local new_conf = core.table.deepcopy(service_conf)
new_conf.value.service_id = new_conf.value.id
new_conf.value.id = route_conf.value.id
new_conf.modifiedIndex = route_conf.modifiedIndex
@@ -601,8 +601,6 @@ local function merge_service_route(service_conf, route_conf)
local route_upstream = route_conf.value.upstream
if route_upstream then
new_conf.value.upstream = route_upstream
- -- when route's upstream override service's upstream,
- -- the upstream.parent still point to the route
new_conf.value.upstream_id = nil
new_conf.has_domain = route_conf.has_domain
end
@@ -657,7 +655,7 @@ end
local function merge_service_stream_route(service_conf, route_conf)
-- because many fields in Service are not supported by stream route,
-- so we copy the stream route as base object
- local new_conf = core.table.deepcopy(route_conf, { shallows =
{"self.value.upstream.parent"}})
+ local new_conf = core.table.deepcopy(route_conf)
if service_conf.value.plugins then
for name, conf in pairs(service_conf.value.plugins) do
if not new_conf.value.plugins then
@@ -705,8 +703,7 @@ local function merge_consumer_route(route_conf,
consumer_conf, consumer_group_co
return route_conf
end
- local new_route_conf = core.table.deepcopy(route_conf,
- { shallows = {"self.value.upstream.parent"}})
+ local new_route_conf = core.table.deepcopy(route_conf)
if consumer_group_conf then
for name, conf in pairs(consumer_group_conf.value.plugins) do
diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua
index 278201d4e..39430c7ad 100644
--- a/apisix/plugins/ai.lua
+++ b/apisix/plugins/ai.lua
@@ -69,9 +69,7 @@ local default_keepalive_pool = {}
local function create_router_matching_cache(api_ctx)
orig_router_http_matching(api_ctx)
- return core.table.deepcopy(api_ctx, {
- shallows = { "self.matched_route.value.upstream.parent" }
- })
+ return core.table.deepcopy(api_ctx)
end
diff --git a/apisix/upstream.lua b/apisix/upstream.lua
index 3d3b6b426..e55694f7e 100644
--- a/apisix/upstream.lua
+++ b/apisix/upstream.lua
@@ -16,11 +16,9 @@
--
local require = require
local core = require("apisix.core")
-local config_local = require("apisix.core.config_local")
local discovery = require("apisix.discovery.init").discovery
local upstream_util = require("apisix.utils.upstream")
local apisix_ssl = require("apisix.ssl")
-local events = require("apisix.events")
local error = error
local tostring = tostring
local ipairs = ipairs
@@ -29,12 +27,7 @@ local pcall = pcall
local ngx_var = ngx.var
local is_http = ngx.config.subsystem == "http"
local upstreams
-local healthcheck
-
-local healthcheck_shdict_name = "upstream-healthcheck"
-if not is_http then
- healthcheck_shdict_name = healthcheck_shdict_name .. "-" ..
ngx.config.subsystem
-end
+local healthcheck_manager
local set_upstream_tls_client_param
local ok, apisix_ngx_upstream = pcall(require, "resty.apisix.upstream")
@@ -86,120 +79,6 @@ end
_M.set = set_directly
-local function release_checker(healthcheck_parent)
- if not healthcheck_parent or not healthcheck_parent.checker then
- return
- end
- local checker = healthcheck_parent.checker
- core.log.info("try to release checker: ", tostring(checker))
- checker:delayed_clear(3)
- checker:stop()
-end
-
-
-local function get_healthchecker_name(value)
- return "upstream#" .. value.key
-end
-_M.get_healthchecker_name = get_healthchecker_name
-
-
-local function create_checker(upstream)
- local local_conf = config_local.local_conf()
- if local_conf and local_conf.apisix and
local_conf.apisix.disable_upstream_healthcheck then
- core.log.info("healthchecker won't be created: disabled upstream
healthcheck")
- return nil
- end
- if healthcheck == nil then
- healthcheck = require("resty.healthcheck")
- end
-
- local healthcheck_parent = upstream.parent
- if healthcheck_parent.checker and healthcheck_parent.checker_upstream ==
upstream
- and healthcheck_parent.checker_nodes_ver == upstream._nodes_ver then
- return healthcheck_parent.checker
- end
-
- if upstream.is_creating_checker then
- core.log.info("another request is creating new checker")
- return nil
- end
- upstream.is_creating_checker = true
-
- core.log.debug("events module used by the healthcheck: ",
events.events_module,
- ", module name: ",events:get_healthcheck_events_modele())
-
- local checker, err = healthcheck.new({
- name = get_healthchecker_name(healthcheck_parent),
- shm_name = healthcheck_shdict_name,
- checks = upstream.checks,
- -- the events.init_worker will be executed in the init_worker phase,
- -- events.healthcheck_events_module is set
- -- while the healthcheck object is executed in the http access phase,
- -- so it can be used here
- events_module = events:get_healthcheck_events_modele(),
- })
-
- if not checker then
- core.log.error("fail to create healthcheck instance: ", err)
- upstream.is_creating_checker = nil
- return nil
- end
-
- if healthcheck_parent.checker then
- local ok, err = pcall(core.config_util.cancel_clean_handler,
healthcheck_parent,
- healthcheck_parent.checker_idx,
true)
- if not ok then
- core.log.error("cancel clean handler error: ", err)
- end
- end
-
- core.log.info("create new checker: ", tostring(checker))
-
- local host = upstream.checks and upstream.checks.active and
upstream.checks.active.host
- local port = upstream.checks and upstream.checks.active and
upstream.checks.active.port
- local up_hdr = upstream.pass_host == "rewrite" and upstream.upstream_host
- local use_node_hdr = upstream.pass_host == "node" or nil
- for _, node in ipairs(upstream.nodes) do
- local host_hdr = up_hdr or (use_node_hdr and node.domain)
- local ok, err = checker:add_target(node.host, port or node.port, host,
- true, host_hdr)
- if not ok then
- core.log.error("failed to add new health check target: ",
node.host, ":",
- port or node.port, " err: ", err)
- end
- end
-
- local check_idx, err =
core.config_util.add_clean_handler(healthcheck_parent, release_checker)
- if not check_idx then
- upstream.is_creating_checker = nil
- checker:clear()
- checker:stop()
- core.log.error("failed to add clean handler, err:",
- err, " healthcheck parent:",
core.json.delay_encode(healthcheck_parent, true))
-
- return nil
- end
-
- healthcheck_parent.checker = checker
- healthcheck_parent.checker_upstream = upstream
- healthcheck_parent.checker_nodes_ver = upstream._nodes_ver
- healthcheck_parent.checker_idx = check_idx
-
- upstream.is_creating_checker = nil
-
- return checker
-end
-
-
-local function fetch_healthchecker(upstream)
- if not upstream.checks then
- return nil
- end
-
- return create_checker(upstream)
-end
-
-
local function set_upstream_scheme(ctx, upstream)
-- plugins like proxy-rewrite may already set ctx.upstream_scheme
if not ctx.upstream_scheme then
@@ -334,8 +213,8 @@ function _M.set_by_route(route, api_ctx)
up_conf.nodes = new_nodes
end
- local id = up_conf.parent.value.id
- local conf_version = up_conf.parent.modifiedIndex
+ local id = up_conf.resource_id
+ local conf_version = up_conf.resource_version
-- include the upstream object as part of the version, because the
upstream will be changed
-- by service discovery or dns resolver.
set_directly(api_ctx, id, conf_version .. "#" .. tostring(up_conf) .. "#"
@@ -343,7 +222,6 @@ function _M.set_by_route(route, api_ctx)
local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 0 then
- release_checker(up_conf.parent)
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node"
end
@@ -365,8 +243,9 @@ function _M.set_by_route(route, api_ctx)
ngx_var.upstream_sni = sni
end
end
-
- local checker = fetch_healthchecker(up_conf)
+ local resource_version =
healthcheck_manager.upstream_version(up_conf.resource_version,
+
up_conf._nodes_ver)
+ local checker =
healthcheck_manager.fetch_checker(up_conf.resource_key, resource_version)
api_ctx.up_checker = checker
return
end
@@ -377,10 +256,10 @@ function _M.set_by_route(route, api_ctx)
if not ok then
return 503, err
end
-
- local checker = fetch_healthchecker(up_conf)
+ local resource_version =
healthcheck_manager.upstream_version(up_conf.resource_version,
+
up_conf._nodes_ver )
+ local checker = healthcheck_manager.fetch_checker(up_conf.resource_key,
resource_version)
api_ctx.up_checker = checker
-
local scheme = up_conf.scheme
if (scheme == "https" or scheme == "grpcs") and up_conf.tls then
@@ -569,9 +448,9 @@ local function filter_upstream(value, parent)
if not value then
return
end
-
- value.parent = parent
-
+ value.resource_key = parent and parent.key
+ value.resource_version = ((parent and parent.modifiedIndex) or
value.modifiedIndex)
+ value.resource_id = ((parent and parent.value.id) or value.id)
if not is_http and value.scheme == "http" then
-- For L4 proxy, the default scheme is "tcp"
value.scheme = "tcp"
@@ -633,6 +512,8 @@ function _M.init_worker()
error("failed to create etcd instance for fetching upstream: " .. err)
return
end
+ healthcheck_manager = require("apisix.healthcheck_manager")
+ healthcheck_manager.init_worker()
end
diff --git a/t/control/healthcheck.t b/t/control/healthcheck.t
index 9673ab917..749c3e62a 100644
--- a/t/control/healthcheck.t
+++ b/t/control/healthcheck.t
@@ -75,7 +75,7 @@ upstreams:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET"})
- ngx.sleep(2.2)
+ ngx.sleep(4)
local _, _, res = t.test('/v1/healthcheck',
ngx.HTTP_GET)
@@ -84,6 +84,7 @@ upstreams:
table.sort(res[1].nodes, function(a, b)
return a.ip < b.ip
end)
+ ngx.log(ngx.WARN, core.json.stably_encode(res[1].nodes))
ngx.say(core.json.stably_encode(res[1].nodes))
local _, _, res = t.test('/v1/healthcheck/upstreams/1',
@@ -96,6 +97,7 @@ upstreams:
local _, _, res = t.test('/v1/healthcheck/upstreams/1',
ngx.HTTP_GET, nil, nil, {["Accept"] = "text/html"})
+ ngx.sleep(4)
local xml2lua = require("xml2lua")
local xmlhandler = require("xmlhandler.tree")
local handler = xmlhandler:new()
@@ -114,6 +116,7 @@ upstreams:
end
end
end
+ ngx.sleep(4)
assert(matches == 2, "unexpected html")
}
}
@@ -125,6 +128,7 @@ unhealthy TCP increment (2/2) for
'127.0.0.2(127.0.0.2:1988)'
--- response_body
[{"counter":{"http_failure":0,"success":0,"tcp_failure":0,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1980,"status":"healthy"},{"counter":{"http_failure":0,"success":0,"tcp_failure":2,"timeout_failure":0},"hostname":"127.0.0.2","ip":"127.0.0.2","port":1988,"status":"unhealthy"}]
[{"counter":{"http_failure":0,"success":0,"tcp_failure":0,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1980,"status":"healthy"},{"counter":{"http_failure":0,"success":0,"tcp_failure":2,"timeout_failure":0},"hostname":"127.0.0.2","ip":"127.0.0.2","port":1988,"status":"unhealthy"}]
+--- timeout: 14
@@ -168,7 +172,7 @@ routes:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET"})
- ngx.sleep(2.2)
+ ngx.sleep(4)
local code, body, res = t.test('/v1/healthcheck',
ngx.HTTP_GET)
@@ -185,6 +189,7 @@ routes:
return a.port < b.port
end)
ngx.say(json.encode(res))
+ ngx.sleep(4)
}
}
--- grep_error_log eval
@@ -195,6 +200,7 @@ unhealthy TCP increment (2/2) for
'127.0.0.1(127.0.0.1:1988)'
--- response_body
[{"name":"/routes/1","nodes":[{"counter":{"http_failure":0,"success":0,"tcp_failure":0,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1980,"status":"healthy"},{"counter":{"http_failure":0,"success":0,"tcp_failure":2,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1988,"status":"unhealthy"}],"type":"http"}]
{"name":"/routes/1","nodes":[{"counter":{"http_failure":0,"success":0,"tcp_failure":0,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1980,"status":"healthy"},{"counter":{"http_failure":0,"success":0,"tcp_failure":2,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1988,"status":"unhealthy"}],"type":"http"}
+--- timeout: 10
@@ -243,7 +249,7 @@ services:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET"})
- ngx.sleep(2.2)
+ ngx.sleep(4)
local code, body, res = t.test('/v1/healthcheck',
ngx.HTTP_GET)
@@ -260,6 +266,7 @@ services:
return a.port < b.port
end)
ngx.say(json.encode(res))
+ ngx.sleep(4)
}
}
--- grep_error_log eval
@@ -270,6 +277,7 @@ unhealthy TCP increment (2/2) for
'127.0.0.1(127.0.0.1:1988)'
--- response_body
[{"name":"/services/1","nodes":[{"counter":{"http_failure":0,"success":0,"tcp_failure":2,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1988,"status":"unhealthy"}],"type":"http"}]
{"name":"/services/1","nodes":[{"counter":{"http_failure":0,"success":0,"tcp_failure":2,"timeout_failure":0},"hostname":"127.0.0.1","ip":"127.0.0.1","port":1988,"status":"unhealthy"}],"type":"http"}
+--- timeout: 9
@@ -278,6 +286,7 @@ unhealthy TCP increment (2/2) for
'127.0.0.1(127.0.0.1:1988)'
location /t {
content_by_lua_block {
local t = require("lib.test_admin")
+ ngx.sleep(4)
local code, body, res = t.test('/v1/healthcheck',
ngx.HTTP_GET)
ngx.print(res)
@@ -285,6 +294,7 @@ unhealthy TCP increment (2/2) for
'127.0.0.1(127.0.0.1:1988)'
}
--- response_body
{}
+--- timeout: 5
diff --git a/t/control/routes.t b/t/control/routes.t
index a24bc2c15..b707655f6 100644
--- a/t/control/routes.t
+++ b/t/control/routes.t
@@ -75,8 +75,8 @@ routes:
end
}
}
---- response_body
-{"upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"},"uris":["/hello"]}
+--- response_body eval
+qr/\{"upstream":\{"hash_on":"vars","nodes":\[\{"host":"127.0.0.1","port":1980,"weight":1\}\],"pass_host":"pass",.*"scheme":"http","type":"roundrobin"\},"uris":\["\/hello"\]\}/
@@ -108,8 +108,8 @@ routes:
end
}
}
---- response_body
-{"upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"},"uris":["/hello"]}
+--- response_body eval
+qr/\{"upstream":\{"hash_on":"vars","nodes":\[\{"host":"127.0.0.1","port":1980,"weight":1\}\],"pass_host":"pass",.*"scheme":"http","type":"roundrobin"\},"uris":\["\/hello"\]\}/
diff --git a/t/control/services.t b/t/control/services.t
index 0003bcc9d..3a959fe4c 100644
--- a/t/control/services.t
+++ b/t/control/services.t
@@ -75,8 +75,8 @@ services:
return
}
}
---- response_body
-{"id":"200","upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"}}
+--- response_body eval
+qr/\{"id":"200","upstream":\{"hash_on":"vars","nodes":\[\{"host":"127.0.0.1","port":1980,"weight":1\}\],"pass_host":"pass".*,"scheme":"http","type":"roundrobin"\}\}/
@@ -117,8 +117,8 @@ services:
return
}
}
---- response_body
-[{"id":"200","upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"}},{"id":"201","upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.2","port":1980,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"}}]
+--- response_body eval
+qr/\{"id":"200","upstream":\{"hash_on":"vars","nodes":\[\{"host":"127.0.0.1","port":1980,"weight":1\}\],"pass_host":"pass".*,"scheme":"http","type":"roundrobin"\}\}/
@@ -156,8 +156,8 @@ services:
return
}
}
---- response_body
-{"id":"5","plugins":{"limit-count":{"allow_degradation":false,"count":2,"key":"remote_addr","key_type":"var","policy":"local","rejected_code":503,"show_limit_quota_header":true,"time_window":60}},"upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"}}
+--- response_body eval
+qr/\{"id":"5","plugins":\{"limit-count":\{"allow_degradation":false,"count":2,"key":"remote_addr","key_type":"var","policy":"local","rejected_code":503,"show_limit_quota_header":true,"time_window":60\}\},"upstream":\{"hash_on":"vars","nodes":\[\{"host":"127.0.0.1","port":1980,"weight":1\}\],"pass_host":"pass",.*"scheme":"http","type":"roundrobin"\}\}/
diff --git a/t/discovery/reset-healthchecker.t
b/t/discovery/reset-healthchecker.t
index 8612f1fd7..9816aabaa 100644
--- a/t/discovery/reset-healthchecker.t
+++ b/t/discovery/reset-healthchecker.t
@@ -102,16 +102,16 @@ routes:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
ngx.say(res.body)
- ngx.sleep(5)
+ ngx.sleep(20)
}
}
--- request
GET /t
--- response_body
ok
---- timeout: 22
+--- timeout: 37
--- no_error_log
-unhealthy TCP increment (10/30)
+unhealthy TCP increment (20/30)
@@ -158,12 +158,12 @@ routes:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
ngx.status = res.status
- ngx.sleep(5)
+ ngx.sleep(20)
}
}
--- request
GET /t
---- timeout: 22
+--- timeout: 37
--- no_error_log
-unhealthy TCP increment (10/30)
+unhealthy TCP increment (20/30)
--- error_code: 503
diff --git a/t/node/healthcheck-discovery.t b/t/node/healthcheck-discovery.t
index 8a9b0e976..9978bba93 100644
--- a/t/node/healthcheck-discovery.t
+++ b/t/node/healthcheck-discovery.t
@@ -94,7 +94,7 @@ routes:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
- ngx.sleep(1.5)
+ ngx.sleep(4)
ngx.say(res.status)
}
@@ -104,6 +104,7 @@ qr/unhealthy TCP increment \(1\/2\) for
'127.0.0.1\([^)]+\)'/
--- grep_error_log_out
unhealthy TCP increment (1/2) for '127.0.0.1(127.0.0.1:1988)'
unhealthy TCP increment (1/2) for '127.0.0.1(0.0.0.0:1988)'
+--- timeout: 5
@@ -130,7 +131,7 @@ routes:
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
- ngx.sleep(0.5)
+ ngx.sleep(3)
discovery.mock = {
nodes = function()
@@ -144,15 +145,17 @@ routes:
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
+ ngx.sleep(20)
ngx.say(res.status)
}
}
--- grep_error_log eval
-qr/(create new checker|try to release checker): table/
+qr/(create new checker|releasing existing checker): table/
--- grep_error_log_out
create new checker: table
-try to release checker: table
+releasing existing checker: table
create new checker: table
+--- timeout: 30
@@ -179,7 +182,7 @@ routes:
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
- ngx.sleep(0.5)
+ ngx.sleep(2)
discovery.mock = {
nodes = function()
@@ -192,6 +195,7 @@ routes:
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
+ ngx.sleep(2)
ngx.say(res.status)
}
}
@@ -199,3 +203,4 @@ routes:
qr/(create new checker|try to release checker): table/
--- grep_error_log_out
create new checker: table
+--- timeout: 5
diff --git a/t/node/healthcheck-https.t b/t/node/healthcheck-https.t
index b1f7b7ba0..8b2efa5ae 100644
--- a/t/node/healthcheck-https.t
+++ b/t/node/healthcheck-https.t
@@ -153,7 +153,7 @@ __DATA__
local httpc = http.new()
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/ping"
local _, _ = httpc:request_uri(uri, {method = "GET", keepalive =
false})
- ngx.sleep(0.5)
+ ngx.sleep(2)
local healthcheck_uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/v1/healthcheck/routes/1"
local httpc = http.new()
@@ -232,7 +232,7 @@ GET /t
local httpc = http.new()
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/ping"
local _, _ = httpc:request_uri(uri, {method = "GET", keepalive =
false})
- ngx.sleep(1.5)
+ ngx.sleep(4)
local healthcheck_uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/v1/healthcheck/routes/1"
local httpc = http.new()
@@ -260,6 +260,7 @@ GET /t
qr/\([^)]+\) unhealthy .* for '.*'/
--- grep_error_log_out
(upstream#/apisix/routes/1) unhealthy HTTP increment (1/1) for
'127.0.0.1(127.0.0.1:8766)'
+--- timeout: 8
@@ -314,7 +315,7 @@ qr/\([^)]+\) unhealthy .* for '.*'/
local httpc = http.new()
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/ping"
local _, _ = httpc:request_uri(uri, {method = "GET", keepalive =
false})
- ngx.sleep(1.5)
+ ngx.sleep(4)
local healthcheck_uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/v1/healthcheck/routes/1"
local httpc = http.new()
@@ -339,3 +340,4 @@ qr/\([^)]+\) unhealthy .* for '.*'/
--- request
GET /t
--- error_code: 200
+--- timeout: 8
diff --git a/t/node/healthcheck-ipv6.t b/t/node/healthcheck-ipv6.t
index dc33dece2..78c328356 100644
--- a/t/node/healthcheck-ipv6.t
+++ b/t/node/healthcheck-ipv6.t
@@ -106,7 +106,7 @@ qr/^.*?\[error\](?!.*process exiting).*/
ngx.log(ngx.ERR, "It works")
end
- ngx.sleep(2.5)
+ ngx.sleep(3)
local ports_count = {}
for i = 1, 12 do
diff --git a/t/node/healthcheck-leak-bugfix.t b/t/node/healthcheck-leak-bugfix.t
index 1caf5d348..bcab5689d 100644
--- a/t/node/healthcheck-leak-bugfix.t
+++ b/t/node/healthcheck-leak-bugfix.t
@@ -101,8 +101,9 @@ location /t {
local t = require("lib.test_admin").test
assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg) < 300)
t('/hello', ngx.HTTP_GET)
+ ngx.sleep(2)
assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg) < 300)
- ngx.sleep(1)
+ ngx.sleep(2)
}
}
@@ -110,3 +111,4 @@ location /t {
GET /t
--- error_log
clear checker
+--- timeout: 7
diff --git a/t/node/healthcheck-passive-resty-events.t
b/t/node/healthcheck-passive-resty-events.t
index d90cbece7..faba3fa7f 100644
--- a/t/node/healthcheck-passive-resty-events.t
+++ b/t/node/healthcheck-passive-resty-events.t
@@ -115,7 +115,7 @@ passed
ngx.say(err)
return
end
- ngx.sleep(1) -- Wait for health check unhealthy events sync
+ ngx.sleep(2) -- Wait for health check unhealthy events sync
local ports_count = {}
for i = 1, 6 do
@@ -291,7 +291,7 @@ passed
local json_sort = require("toolkit.json")
local http = require("resty.http")
local uri = "http://127.0.0.1:" .. ngx.var.server_port
-
+ ngx.sleep(3.5)
local ports_count = {}
local httpc = http.new()
local res, err = httpc:request_uri(uri .. "/hello_")
@@ -300,13 +300,23 @@ passed
return
end
ngx.say(res.status)
-
+ ngx.sleep(3.5)
+ --- The first request above triggers the passive healthcheck
+ --- The healthchecker is asynchronously created after a minimum of
1 second
+ --- So we need to wait for it to be created and sent another
request to verify
-- only /hello_ has passive healthcheck
+ local res, err = httpc:request_uri(uri .. "/hello_")
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.sleep(2)
local res, err = httpc:request_uri(uri .. "/hello")
if not res then
ngx.say(err)
return
end
+
ngx.say(res.status)
}
}
@@ -319,6 +329,7 @@ GET /t
qr/enabled healthcheck passive/
--- grep_error_log_out
enabled healthcheck passive
+--- timeout: 15
@@ -359,7 +370,7 @@ enabled healthcheck passive
end
ngx.say(res.status)
- ngx.sleep(1) -- Wait for health check unhealthy events sync
+ ngx.sleep(4) -- Wait for health check unhealthy events sync
-- The second time request to /hello_
local res, err = httpc:request_uri(uri .. "/hello_")
@@ -380,3 +391,4 @@ GET /t
qr/\[healthcheck\] \([^)]+\) unhealthy HTTP increment/
--- grep_error_log_out
[healthcheck] (upstream#/apisix/routes/2) unhealthy HTTP increment
+--- timeout: 6
diff --git a/t/node/healthcheck-passive.t b/t/node/healthcheck-passive.t
index 7404ff016..8de3eab8d 100644
--- a/t/node/healthcheck-passive.t
+++ b/t/node/healthcheck-passive.t
@@ -96,12 +96,19 @@ passed
--- config
location /t {
content_by_lua_block {
- ngx.sleep(1) -- wait for sync
+ ngx.sleep(2) -- wait for sync
local json_sort = require("toolkit.json")
local http = require("resty.http")
local uri = "http://127.0.0.1:" .. ngx.var.server_port ..
"/server_port"
-
+ --- trigger the passive healthcheck
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.sleep(3)
local ports_count = {}
for i = 1, 6 do
local httpc = http.new()
@@ -125,6 +132,7 @@ GET /t
{"200":5,"502":1}
--- error_log
(upstream#/apisix/routes/1) unhealthy HTTP increment (1/1)
+--- timeout: 7
@@ -285,7 +293,17 @@ passed
return
end
ngx.say(res.status)
-
+ ngx.sleep(2)
+ --- The first request above triggers the passive healthcheck
+ --- The healthchecker is asynchronously created after a minimum of
1 second
+ --- So we need to wait for it to be created and sent another
request to verify
+ -- only /hello_ has passive healthcheck
+ local res, err = httpc:request_uri(uri .. "/hello_")
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.sleep(2)
-- only /hello_ has passive healthcheck
local res, err = httpc:request_uri(uri .. "/hello")
if not res then
@@ -304,6 +322,7 @@ GET /t
qr/enabled healthcheck passive/
--- grep_error_log_out
enabled healthcheck passive
+--- timeout: 7
@@ -325,6 +344,16 @@ enabled healthcheck passive
end
ngx.say(res.status)
+ local res, err = httpc:request_uri(uri .. "/hello_")
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.sleep(2)
+ --- The first request above triggers the passive healthcheck
+ --- The healthchecker is asynchronously created after a minimum of
1 second
+ --- So we need to wait for it to be created and sent another
request to verify
+ -- only /hello_ has passive healthcheck
local res, err = httpc:request_uri(uri .. "/hello_")
if not res then
ngx.say(err)
diff --git a/t/node/healthcheck-stop-checker.t
b/t/node/healthcheck-stop-checker.t
index 54ed61763..2fecc8b88 100644
--- a/t/node/healthcheck-stop-checker.t
+++ b/t/node/healthcheck-stop-checker.t
@@ -84,15 +84,15 @@ PUT /apisix/admin/routes/1
end
ngx.say("1 code: ", code)
- ngx.sleep(0.2)
+ ngx.sleep(3)
local code, body = t('/server_port', "GET")
ngx.say("2 code: ", code)
- ngx.sleep(0.2)
+ ngx.sleep(2)
code = t('/apisix/admin/routes/1', "DELETE")
ngx.say("3 code: ", code)
- ngx.sleep(0.2)
+ ngx.sleep(3)
local code, body = t('/server_port', "GET")
ngx.say("4 code: ", code)
}
@@ -109,6 +109,7 @@ qr/create new checker: table: 0x|try to release checker:
table: 0x/
--- grep_error_log_out
create new checker: table: 0x
try to release checker: table: 0x
+--- timeout: 10
@@ -128,7 +129,7 @@ PUT /apisix/admin/routes/1
local code, body = t('/server_port', "GET")
ngx.say("1 code: ", code)
-
+ ngx.sleep(2)
local code, status, body = t('/apisix/admin/routes/1',
"PUT",
[[{"uri":"/server_port","upstream":{"type":"roundrobin","nodes":{"127.0.0.1:1980":1,"127.0.0.1:1981":1},"checks":{"active":{"http_path":"/status","healthy":{"interval":1,"successes":1},"unhealthy":{"interval":1,"http_failures":2}}}}}]]
@@ -139,9 +140,10 @@ PUT /apisix/admin/routes/1
end
ngx.say("2 code: ", code)
- ngx.sleep(0.2)
+ ngx.sleep(2)
local code, body = t('/server_port', "GET")
ngx.say("3 code: ", code)
+ ngx.sleep(2)
}
}
--- request
@@ -156,6 +158,7 @@ qr/create new checker: table: 0x|try to release checker:
table: 0x/
create new checker: table: 0x
try to release checker: table: 0x
create new checker: table: 0x
+--- timeout: 7
@@ -187,7 +190,7 @@ create new checker: table: 0x
return
end
- ngx.sleep(0.2)
+ ngx.sleep(1)
code, _, body = t('/server_port', "GET")
if code > 300 then
@@ -196,7 +199,7 @@ create new checker: table: 0x
return
end
- ngx.sleep(0.5)
+ ngx.sleep(3)
-- update
code, _, body = t('/apisix/admin/upstreams/stopchecker',
@@ -210,7 +213,7 @@ create new checker: table: 0x
return
end
- ngx.sleep(0.2)
+ ngx.sleep(3)
code, _, body = t('/server_port', "GET")
if code > 300 then
@@ -219,6 +222,7 @@ create new checker: table: 0x
return
end
+ ngx.sleep(3)
-- delete
code, _, body = t('/apisix/admin/routes/1', "DELETE")
@@ -227,7 +231,7 @@ create new checker: table: 0x
ngx.say(body)
return
end
- ngx.sleep(0.5) -- wait for routes delete event synced
+ ngx.sleep(3) -- wait for routes delete event synced
code, _, body = t('/apisix/admin/upstreams/stopchecker', "DELETE")
@@ -236,7 +240,7 @@ create new checker: table: 0x
ngx.say(body)
return
end
-
+ ngx.sleep(10)
ngx.say("ok")
}
}
@@ -251,3 +255,4 @@ create new checker: table: 0x
try to release checker: table: 0x
create new checker: table: 0x
try to release checker: table: 0x
+--- timeout: 30
diff --git a/t/node/healthcheck.t b/t/node/healthcheck.t
index 546b06db4..5eba37c21 100644
--- a/t/node/healthcheck.t
+++ b/t/node/healthcheck.t
@@ -93,7 +93,7 @@ qr/^.*?\[error\](?!.*process exiting).*/
ngx.say(err)
return
end
-
+ ngx.sleep(3)
local ports_count = {}
for i = 1, 12 do
local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
@@ -614,7 +614,7 @@ passed
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
- ngx.sleep(2)
+ ngx.sleep(4)
ngx.say(res.status)
}
@@ -627,6 +627,7 @@ GET /t
qr/^.*?\[warn\].*/
--- grep_error_log_out eval
qr/unhealthy TCP increment.*foo.com/
+--- timeout: 5
@@ -688,7 +689,7 @@ passed
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive
= false})
- ngx.sleep(2)
+ ngx.sleep(4)
ngx.say(res.status)
}
diff --git a/t/node/healthcheck2.t b/t/node/healthcheck2.t
index d63e80ebd..8c5fef231 100644
--- a/t/node/healthcheck2.t
+++ b/t/node/healthcheck2.t
@@ -131,6 +131,7 @@ routes:
table.sort(ports_arr, cmd)
ngx.say(require("toolkit.json").encode(ports_arr))
+ ngx.sleep(2.5)
ngx.exit(200)
}
}
@@ -257,6 +258,7 @@ routes:
--- config
location /t {
content_by_lua_block {
+ ngx.sleep(3)
local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/server_port"
@@ -265,14 +267,19 @@ routes:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
end
-
- ngx.sleep(1)
+ ngx.sleep(3)
+ --- active health check is created async after at least 1 second
so it will take effect
+ --- from next request. And first request will just trigger it.
+ do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ end
+ ngx.sleep(3)
}
}
---- no_error_log
-client request host: localhost
--- error_log
client request host: 127.0.0.1
+--- timeout: 10
@@ -309,12 +316,19 @@ routes:
local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
end
- ngx.sleep(1)
+ --- active health check is created async after at least 1 second
so it will take effect
+ --- from next request. And first request will just trigger it.
+ do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ end
+ ngx.sleep(3)
}
}
--- error_log
client request host: localhost
client request host: 127.0.0.1
+--- timeout: 10
@@ -351,8 +365,13 @@ routes:
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
end
-
- ngx.sleep(1)
+ --- active health check is created async after at least 1 second
so it will take effect
+ --- from next request. And first request will just trigger it.
+ do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ end
+ ngx.sleep(3)
}
}
--- no_error_log
@@ -360,3 +379,4 @@ client request host: localhost
client request host: 127.0.0.1
--- error_log
client request host: foo.com
+--- timeout: 10
diff --git a/t/node/healthcheck3.t b/t/node/healthcheck3.t
index a1209afa9..2e158de1b 100644
--- a/t/node/healthcheck3.t
+++ b/t/node/healthcheck3.t
@@ -110,7 +110,7 @@ qr/^.*?\[error\](?!.*process exiting).*/
for i, th in ipairs(t) do
ngx.thread.wait(th)
end
-
+ ngx.sleep(4)
ngx.exit(200)
}
}
diff --git a/t/node/priority-balancer/health-checker.t
b/t/node/priority-balancer/health-checker.t
index cd970c667..1348c9cf6 100644
--- a/t/node/priority-balancer/health-checker.t
+++ b/t/node/priority-balancer/health-checker.t
@@ -94,6 +94,7 @@ upstreams:
ngx.sleep(2.5)
-- still use all nodes
httpc:request_uri(uri, {method = "GET"})
+ ngx.sleep(2.5)
}
}
--- request
@@ -109,6 +110,7 @@ proxy request to 127.0.0.1:1979
proxy request to 127.0.0.2:1979
proxy request to 127.0.0.1:1979
proxy request to 127.0.0.2:1979
+--- timeout: 8
@@ -169,6 +171,7 @@ passed
httpc:request_uri(uri, {method = "GET"})
ngx.sleep(2.5)
httpc:request_uri(uri, {method = "GET"})
+ ngx.sleep(2.5)
}
}
--- request
@@ -181,4 +184,6 @@ qr/proxy request to \S+/
--- grep_error_log_out
proxy request to 127.0.0.1:1979
proxy request to 127.0.0.1:1980
+proxy request to 127.0.0.1:1979
proxy request to 127.0.0.1:1980
+--- timeout: 8
diff --git a/t/node/rr-balance.t b/t/node/rr-balance.t
index 74bbf9ea8..69c4cfebb 100644
--- a/t/node/rr-balance.t
+++ b/t/node/rr-balance.t
@@ -138,7 +138,10 @@ passed
local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/server_port"
-
+ --- to trigger the healthchecker
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET"})
+ ngx.sleep(3)
local ports_count = {}
for i = 1, 12 do
local httpc = http.new()
@@ -211,7 +214,10 @@ passed
local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/server_port"
-
+ --- to trigger the healthchecker
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET"})
+ ngx.sleep(3)
local ports_count = {}
for i = 1, 12 do
local httpc = http.new()
@@ -284,7 +290,10 @@ passed
local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/server_port"
-
+ --- to trigger the healthchecker
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET"})
+ ngx.sleep(3)
local ports_count = {}
for i = 1, 12 do
local httpc = http.new()
diff --git a/t/stream-node/healthcheck-resty-events.t
b/t/stream-node/healthcheck-resty-events.t
index 16bd5934b..e97abc3bd 100644
--- a/t/stream-node/healthcheck-resty-events.t
+++ b/t/stream-node/healthcheck-resty-events.t
@@ -94,7 +94,7 @@ passed
sock:close()
-- wait for health check to take effect
- ngx.sleep(2.5)
+ ngx.sleep(10)
for i = 1, 3 do
local sock = ngx.socket.tcp()
@@ -133,12 +133,12 @@ passed
end
-- wait for checker to release
- ngx.sleep(1)
+ ngx.sleep(3)
ngx.say("passed")
}
}
---- timeout: 10
+--- timeout: 15
--- request
GET /t
--- response_body
@@ -227,7 +227,7 @@ passed
local data, _ = sock:receive()
assert(data == nil, "first request should fail")
sock:close()
-
+ ngx.sleep(8)
-- Due to the implementation of lua-resty-events, it relies on the
kernel and
-- the Nginx event loop to process socket connections.
-- When lua-resty-healthcheck handles passive healthchecks and
uses lua-resty-events
diff --git a/t/stream-node/healthcheck-resty-worker-events.t
b/t/stream-node/healthcheck-resty-worker-events.t
index a841cba6f..923b36603 100644
--- a/t/stream-node/healthcheck-resty-worker-events.t
+++ b/t/stream-node/healthcheck-resty-worker-events.t
@@ -94,7 +94,7 @@ passed
sock:close()
-- wait for health check to take effect
- ngx.sleep(2.5)
+ ngx.sleep(4.5)
for i = 1, 3 do
local sock = ngx.socket.tcp()
@@ -227,6 +227,18 @@ passed
local data, _ = sock:receive()
assert(data == nil, "first request should fail")
sock:close()
+ ngx.sleep(2)
+
+
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+ end
+ local data, _ = sock:receive()
+ assert(data == nil, "one more request fail")
+ sock:close()
+ ngx.sleep(2)
for i = 1, 3 do
local sock = ngx.socket.tcp()