Copilot commented on code in PR #13050:
URL: https://github.com/apache/apisix/pull/13050#discussion_r2922648622
##########
docs/en/latest/plugins/ai-proxy-multi.md:
##########
@@ -7,7 +7,7 @@ keywords:
- ai-proxy-multi
- AI
- LLM
-description: The ai-proxy-multi Plugin extends the capabilities of ai-proxy
with load balancing, retries, fallbacks, and health chekcs, simplifying the
integration with OpenAI, DeepSeek, Azure, AIMLAPI, Anthropic, OpenRouter,
Gemini, Vertex AI, and other OpenAI-compatible APIs.
+description: The ai-proxy-multi Plugin extends the capabilities of ai-proxy
with load balancing, retries, fallbacks, and health chekcs, simplifying the
integration with OpenAI, DeepSeek, Azure, AIMLAPI, Anthropic, Anthropic-native,
OpenRouter, Gemini, Vertex AI, and other OpenAI-compatible APIs.
Review Comment:
Spelling: "health chekcs" should be "health checks" (also appears in the
page description metadata).
```suggestion
description: The ai-proxy-multi Plugin extends the capabilities of ai-proxy
with load balancing, retries, fallbacks, and health checks, simplifying the
integration with OpenAI, DeepSeek, Azure, AIMLAPI, Anthropic, Anthropic-native,
OpenRouter, Gemini, Vertex AI, and other OpenAI-compatible APIs.
```
##########
docs/en/latest/plugins/ai-proxy-multi.md:
##########
@@ -35,7 +35,7 @@ description: The ai-proxy-multi Plugin extends the
capabilities of ai-proxy with
## Description
-The `ai-proxy-multi` Plugin simplifies access to LLM and embedding models by
transforming Plugin configurations into the designated request format for
OpenAI, DeepSeek, Azure, AIMLAPI, Anthropic, OpenRouter, Gemini, Vertex AI, and
other OpenAI-compatible APIs. It extends the capabilities of
[`ai-proxy`](./ai-proxy.md) with load balancing, retries, fallbacks, and health
checks.
+The `ai-proxy-multi` Plugin simplifies access to LLM and embedding models by
transforming Plugin configurations into the designated request format for
OpenAI, DeepSeek, Azure, AIMLAPI, Anthropic, Anthropic-native, OpenRouter,
Gemini, Vertex AI, and other OpenAI-compatible APIs. It extends the
capabilities of [`ai-proxy`](./ai-proxy.md) with load balancing, retries,
fallbacks, and health checks.
Review Comment:
Documentation clarity: this section lists providers using brand names, but
"Anthropic-native" is the config key and is shown elsewhere as
`anthropic-native`. Consider using consistent formatting (e.g., backticks and
lowercase `anthropic-native`) to avoid confusion about the exact provider value.
##########
apisix/plugins/ai-drivers/anthropic-native.lua:
##########
@@ -0,0 +1,399 @@
+--
+-- anthropic-native.lua
+-- A driver for the Anthropic Messages API native protocol (/v1/messages).
+-- Handles Anthropic-specific SSE event types, response format, and token
usage fields.
+-- Compatible with any endpoint that speaks the native Anthropic protocol,
+-- e.g. api.anthropic.com/v1/messages or
api.deepseek.com/anthropic/v1/messages.
+--
+-- Differences from openai-base:
+-- Request: removes stream_options (not supported), adds anthropic-version
header
+-- Response: content[].text (not choices[].message.content)
+-- SSE text: event=content_block_delta, delta.type=text_delta, delta.text
+-- SSE token: message_start -> input_tokens; message_delta -> output_tokens
+-- SSE end: event=message_stop (no [DONE] sentinel)
+-- Token fields: input_tokens / output_tokens (not prompt_tokens /
completion_tokens)
+--
+
+local _M = {}
+
+local mt = { __index = _M }
+
+local CONTENT_TYPE_JSON = "application/json"
+local ANTHROPIC_VERSION = "2023-06-01"
+
+local core = require("apisix.core")
+local plugin = require("apisix.plugin")
+local http = require("resty.http")
+local url = require("socket.url")
+local sse = require("apisix.plugins.ai-drivers.sse")
+
+local ngx = ngx
+local ngx_now = ngx.now
+local table = table
+local pairs = pairs
+local type = type
+local math = math
+local ipairs = ipairs
+local setmetatable = setmetatable
+local str_lower = string.lower
+
+local HTTP_INTERNAL_SERVER_ERROR = ngx.HTTP_INTERNAL_SERVER_ERROR
+local HTTP_GATEWAY_TIMEOUT = ngx.HTTP_GATEWAY_TIMEOUT
+
+
+function _M.new(opt)
+ return setmetatable(opt or {}, mt)
+end
+
+
+-- Validate incoming request (same as openai-base: must be JSON)
+function _M.validate_request(ctx)
+ local ct = core.request.header(ctx, "Content-Type") or CONTENT_TYPE_JSON
+ if not core.string.has_prefix(ct, CONTENT_TYPE_JSON) then
+ return nil, "unsupported content-type: " .. ct
+ end
+ local request_table, err = core.request.get_json_request_body_table()
+ if not request_table then
+ return nil, err
+ end
+ return request_table, nil
+end
+
+
+local function handle_error(err)
+ if core.string.find(err, "timeout") then
+ return HTTP_GATEWAY_TIMEOUT
+ end
+ return HTTP_INTERNAL_SERVER_ERROR
+end
+
+
+-- Build forward headers, injecting Anthropic-required headers.
+-- Blacklist host/content-length; honour caller-supplied auth headers.
+local function construct_forward_headers(ext_opts_headers, ctx)
+ local blacklist = { "host", "content-length" }
+
+ local opts_headers_lower = {}
+ for k, v in pairs(ext_opts_headers or {}) do
+ opts_headers_lower[str_lower(k)] = v
+ end
+
+ local headers = core.table.merge(core.request.headers(ctx),
opts_headers_lower)
+ headers["Content-Type"] = CONTENT_TYPE_JSON
+ -- Anthropic native protocol requires this version header
+ if not headers["anthropic-version"] then
+ headers["anthropic-version"] = ANTHROPIC_VERSION
+ end
+
+ for _, h in ipairs(blacklist) do
+ headers[h] = nil
+ end
+ return headers
+end
+
+
+-- Extract text content from Anthropic non-streaming response.
+-- Response shape: { content: [{type:"text", text:"..."}], usage:
{input_tokens, output_tokens} }
+local function extract_response_text(res_body)
+ if type(res_body.content) ~= "table" then
+ return ""
+ end
+ local parts = {}
+ for _, block in ipairs(res_body.content) do
+ if type(block) == "table" and block.type == "text" and
type(block.text) == "string" then
+ core.table.insert(parts, block.text)
+ end
+ end
+ return table.concat(parts, "")
+end
+
+
+local function read_response(conf, ctx, res, response_filter)
+ local body_reader = res.body_reader
+ if not body_reader then
+ core.log.warn("AI service sent no response body")
+ return HTTP_INTERNAL_SERVER_ERROR
+ end
+
+ local content_type = res.headers["Content-Type"]
+ core.response.set_header("Content-Type", content_type)
+
+ -- ── Streaming path
────────────────────────────────────────────────────────
+ if content_type and core.string.find(content_type, "text/event-stream")
then
+ local contents = {}
+ while true do
+ local chunk, err = body_reader()
+ ctx.var.apisix_upstream_response_time =
+ math.floor((ngx_now() - ctx.llm_request_start_time) * 1000)
+
+ if err then
+ core.log.warn("failed to read response chunk: ", err)
+ return handle_error(err)
+ end
+ if not chunk then
+ return -- stream finished
+ end
+
+ local events = sse.decode(chunk)
+ ctx.llm_response_contents_in_chunk = {}
+
+ for _, event in ipairs(events) do
+ -- Skip empty data and ping events
+ if event.data == "" or event.type == "ping" then
+ goto CONTINUE
+ end
+
+ -- sse.lua maps "data: [DONE]" to type="done" — some
Anthropic-compatible
+ -- endpoints (e.g. DeepSeek) append this OpenAI sentinel after
message_stop.
+ -- Safe to ignore; we already handled stream completion via
message_stop.
+ if event.type == "done" then
+ goto CONTINUE
+ end
+
+ -- message_stop: stream is done (standard Anthropic
end-of-stream event)
+ if event.type == "message_stop" then
+ ctx.var.llm_request_done = true
+ goto CONTINUE
+ end
+
+ -- error event: log and surface to client
+ -- e.g.
{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}
+ if event.type == "error" then
+ core.log.warn("received error event from anthropic stream:
", event.data)
+ goto CONTINUE
+ end
+
+ local data, decode_err = core.json.decode(event.data)
+ if not data then
+ core.log.warn("failed to decode SSE data: ", decode_err)
+ goto CONTINUE
+ end
+
+ -- message_start: carries input_tokens in message.usage.
+ -- NOTE: output_tokens here is a pre-allocated value (usually
1), NOT the final
+ -- count. We only store input_tokens; output_tokens is
finalised in message_delta.
+ if event.type == "message_start" then
+ local usage = data.message and data.message.usage
+ if usage and type(usage) == "table" then
+ ctx.llm_raw_usage = ctx.llm_raw_usage or {}
+ ctx.llm_raw_usage.input_tokens = usage.input_tokens or 0
+ end
+ goto CONTINUE
+ end
+
+ -- content_block_delta: carries text chunks (text_delta) or
tool input
+ -- (input_json_delta). We only collect text_delta for
llm_response_text.
+ -- TTFT is recorded on the first text_delta (true "first
token").
+ if event.type == "content_block_delta" then
+ local delta = data.delta
+ if type(delta) == "table" and delta.type == "text_delta"
+ and type(delta.text) == "string" then
+ -- Record TTFT on first actual text token
+ if ctx.var.llm_time_to_first_token == "0" then
+ ctx.var.llm_time_to_first_token =
+ math.floor((ngx_now() -
ctx.llm_request_start_time) * 1000)
+ end
+ core.table.insert(contents, delta.text)
+ core.table.insert(ctx.llm_response_contents_in_chunk,
delta.text)
+ end
+ goto CONTINUE
+ end
+
+ -- message_delta: carries the final output_tokens count in
usage.
+ -- This is the authoritative token count for the completed
response.
+ if event.type == "message_delta" then
+ local usage = data.usage
+ if usage and type(usage) == "table" then
+ ctx.llm_raw_usage = ctx.llm_raw_usage or {}
+ ctx.llm_raw_usage.output_tokens = usage.output_tokens
or 0
+ local u = ctx.llm_raw_usage
+ local pt = u.input_tokens or 0
+ local ct = u.output_tokens or 0
+ ctx.ai_token_usage = {
+ prompt_tokens = pt,
+ completion_tokens = ct,
+ total_tokens = pt + ct,
+ }
+ ctx.var.llm_prompt_tokens = pt
+ ctx.var.llm_completion_tokens = ct
+ ctx.var.llm_response_text = table.concat(contents,
"")
+ core.log.warn("got token usage from ai service
(anthropic-native): ",
+ core.json.delay_encode(ctx.ai_token_usage))
+ end
+ goto CONTINUE
+ end
+
+ -- content_block_start / content_block_stop / unknown: pass
through silently
+ ::CONTINUE::
+ end
+
+ plugin.lua_response_filter(ctx, res.headers, chunk)
+ end
+ return -- streaming done
+ end
+
+ -- ── Non-streaming path
────────────────────────────────────────────────────
+ local headers = res.headers
+ local raw_res_body, err = res:read_body()
+ if not raw_res_body then
+ core.log.warn("failed to read response body: ", err)
+ return handle_error(err)
+ end
+
+ ngx.status = res.status
+ ctx.var.llm_time_to_first_token =
+ math.floor((ngx_now() - ctx.llm_request_start_time) * 1000)
+ ctx.var.apisix_upstream_response_time = ctx.var.llm_time_to_first_token
+
+ local res_body, decode_err = core.json.decode(raw_res_body)
+ if decode_err then
+ core.log.warn("invalid response body from ai service: ", raw_res_body,
+ " err: ", decode_err, ", token usage not available")
+ else
+ if response_filter then
+ local resp = { headers = headers, body = res_body }
+ local code, ferr = response_filter(conf, ctx, resp)
+ if code then
+ return code, ferr
+ end
+ if resp.body then
+ local body, encode_err = core.json.encode(resp.body)
+ if not body then
+ core.log.error("failed to encode response body: ",
encode_err)
+ return 500
+ end
+ raw_res_body = body
+ res_body = resp.body
+ end
+ headers = resp.headers
+ end
+
+ -- Extract token usage: Anthropic uses input_tokens / output_tokens
+ ctx.ai_token_usage = {}
+ if type(res_body.usage) == "table" then
+ ctx.llm_raw_usage = res_body.usage
+ local pt = res_body.usage.input_tokens or
res_body.usage.prompt_tokens or 0
+ local ct = res_body.usage.output_tokens or
res_body.usage.completion_tokens or 0
+ ctx.ai_token_usage = {
+ prompt_tokens = pt,
+ completion_tokens = ct,
+ total_tokens = res_body.usage.total_tokens or (pt + ct),
+ }
+ core.log.warn("got token usage from ai service (anthropic-native):
",
+ core.json.delay_encode(ctx.ai_token_usage))
+ end
+ ctx.var.llm_prompt_tokens = ctx.ai_token_usage.prompt_tokens
or 0
+ ctx.var.llm_completion_tokens = ctx.ai_token_usage.completion_tokens
or 0
+
+ -- Extract response text from Anthropic content[] array
+ ctx.var.llm_response_text = extract_response_text(res_body)
+ end
+
+ plugin.lua_response_filter(ctx, headers, raw_res_body)
+end
+
+
+function _M.request(self, ctx, conf, request_table, extra_opts)
+ local httpc, err = http.new()
+ if not httpc then
+ core.log.error("failed to create http client: ", err)
+ return HTTP_INTERNAL_SERVER_ERROR
+ end
+ httpc:set_timeout(conf.timeout)
+
+ -- Anthropic native protocol does NOT support stream_options
+ -- (that is an OpenAI extension); remove it to avoid upstream errors
+ request_table.stream_options = nil
+
+ local endpoint = extra_opts.endpoint
+ local parsed_url
+ if endpoint then
+ parsed_url = url.parse(endpoint)
+ end
+
+ local scheme = parsed_url and parsed_url.scheme or "https"
+ local host = parsed_url and parsed_url.host or self.host or
"api.anthropic.com"
+ local port = parsed_url and parsed_url.port
Review Comment:
`anthropic-native` is required to work with the ai-proxy-multi health-check
/ endpoint resolution logic. `ai-proxy-multi.resolve_endpoint()` falls back to
reading `ai_driver.host`/`ai_driver.port` when `override.endpoint` is not set,
but this driver module doesn't define those fields (unlike the other
providers). This will make endpoint resolution/health checks fail when users
rely on the default Anthropic endpoint. Define `host`, `port` (and ideally
`path`) on the exported driver object (or return an instantiated driver like
the other providers) so the default endpoint can be resolved consistently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]