branch: externals/llm commit 83f05280fc840c6191b84cf8735f2d1b01d08875 Author: Roman Scherer <ro...@burningswell.com> Commit: Roman Scherer <ro...@burningswell.com>
Strip plz changes and add JSON array stream media type - This removes the process slot from the plz response struct which I added initially. We don't need it when using the :as 'buffer option when calling out to plz. We now only depend on one additional change in plz, which is setting the process filter via an option. - This adds the plz-media-type:application/json-array media type which can be used to stream a JSON array as used by Vertex/Gemini. It's a poor mans streaming JSON parser. You can register a handler that gets called for each object in the outer most JSON array of the response. You can use it in a similar way as the event stream handler. ``` (let* ((api-key "my-key") (project "my-project")) (plz-media-type-request 'post (format (concat "https://us-central1-aiplatform.googleapis.com" "/v1/projects/%s/locations" "/us-central1/publishers/google/models" "/gemini-1.0-pro:streamGenerateContent") project) :as `(media-types (("application/json" . ,(plz-media-type:application/json-array :handler (lambda (object) (message "Object: %s" object)))))) :body (json-encode `((contents . [((role . "user") (parts . [((text . "Hello"))]))]) (generation_config (maxOutputTokens . 2048) (temperature . 1) (topP . 0.4)) (safetySettings . [((category . "HARM_CATEGORY_HATE_SPEECH") (threshold . "BLOCK_MEDIUM_AND_ABOVE")) ((category . "HARM_CATEGORY_DANGEROUS_CONTENT") (threshold . "BLOCK_MEDIUM_AND_ABOVE")) ((category . "HARM_CATEGORY_SEXUALLY_EXPLICIT") (threshold . "BLOCK_MEDIUM_AND_ABOVE")) ((category . "HARM_CATEGORY_HARASSMENT") (threshold . "BLOCK_MEDIUM_AND_ABOVE"))]))) :headers `(("Authorization" . ,(format "Bearer %s" api-key)) ("Content-Type" . "application/json")) :then (lambda (response) (message "Done.")))) ``` See the tests in [1] for more examples. Unfortunatly with Gemini/Vertex wanting a streaming parser for application/json we need to distinguish now 2 cases: - Handle application/json in streaming way - Handle application/json in non-streaming way In order to use this, I would suggest to pass the media types via a :media-types option to the following functions: - llm-request-plz-event-stream - llm-request-plz-sync - llm-request-plz-async We could use the plz-media-types as the default, but for any media type that needs to register event handlers for a streaming media type, I think it's best if we wire the list of media types (and the callbacks) together in each provider. Wdyt? [1] https://github.com/r0man/plz.el/blob/plz-media-type/tests/test-plz-media-type.el#L227-L256 --- plz-event-source.el | 26 +++--- plz-media-type.el | 262 +++++++++++++++++++++++++++++++++------------------- plz.el | 13 +-- 3 files changed, 186 insertions(+), 115 deletions(-) diff --git a/plz-event-source.el b/plz-event-source.el index c72e7bb7fa..c98d6f86cb 100644 --- a/plz-event-source.el +++ b/plz-event-source.el @@ -403,16 +403,16 @@ (defclass plz-media-type:text/event-stream (plz-media-type:application/octet-stream) ((name :initform "text/event-stream") (events :documentation "Association list from event type to handler." - :initarg :events))) + :initarg :events + :initform nil + :type list))) -(defun plz-media-type:text/event-stream--event-source (response) - "Return the event source of the RESPONSE." - (process-get (plz-response-process response) :plz-event-source)) +(defvar-local plz-event-source--current nil + "The event source of the current buffer.") (cl-defmethod plz-media-type-else ((_ plz-media-type:text/event-stream) error) "Transform the ERROR into a format suitable for MEDIA-TYPE." - (let* ((response (plz-error-response error)) - (source (plz-media-type:text/event-stream--event-source response)) + (let* ((source plz-event-source--current) (event (plz-event-source-event :type "error" :data error))) (plz-event-source-close source) (plz-event-source-dispatch-event source event) @@ -422,7 +422,7 @@ "Process the CHUNK according to MEDIA-TYPE using PROCESS." (when (buffer-live-p (process-buffer process)) (with-current-buffer (process-buffer process) - (unless (process-get process :plz-event-source) + (unless plz-event-source--current (let* ((response (make-plz-response :status (plz-response-status chunk) :headers (plz-response-headers chunk))) @@ -444,15 +444,13 @@ (funcall handler source event)))) (t pair)))) (oref media-type events)))))) - (process-put process :plz-event-source source))) - (plz-event-source-insert (process-get process :plz-event-source) - (plz-response-body chunk))))) + (setq-local plz-event-source--current source))) + (plz-event-source-insert plz-event-source--current (plz-response-body chunk))))) -(cl-defmethod plz-media-type-then ((_ plz-media-type:text/event-stream) response) +(cl-defmethod plz-media-type-then ((media-type plz-media-type:text/event-stream) response) "Transform the RESPONSE into a format suitable for MEDIA-TYPE." - (let ((source (plz-media-type:text/event-stream--event-source response))) - (plz-event-source-close source) - response)) + (plz-event-source-close plz-event-source--current) + (cl-call-next-method media-type response)) (provide 'plz-event-source) ;;; plz-event-source.el ends here diff --git a/plz-media-type.el b/plz-media-type.el index 156bb75fd5..4b1f493e66 100644 --- a/plz-media-type.el +++ b/plz-media-type.el @@ -34,9 +34,9 @@ (require 'eieio) (require 'plz) -(defclass plz:media-type () +(defclass plz-media-type () ((name - :documentation "The MIME Type of the handler." + :documentation "The name of the media type." :initarg :name :initform "application/octet-stream" :type string))) @@ -67,6 +67,15 @@ (let ((media-type (plz-media-type--content-type response))) (plz-media--type-find media-types media-type))) +(defvar-local plz-media-type--current nil + "The media type of the process buffer.") + +(defvar-local plz-media-type--position nil + "The position in the process buffer.") + +(defvar-local plz-media-type--response nil + "The response of the process buffer.") + (defun plz-media-type-process-filter (process media-types chunk) "The process filter that handles different content types. @@ -79,8 +88,8 @@ CHUNK is a part of the HTTP body." (when (buffer-live-p (process-buffer process)) (with-current-buffer (process-buffer process) (let ((moving (= (point) (process-mark process)))) - (if-let (media-type (process-get process :plz-media-type)) - (let ((response (process-get process :plz-media-type-response))) + (if-let (media-type plz-media-type--current) + (let ((response plz-media-type--response)) (setf (plz-response-body response) chunk) (plz-media-type-process media-type process response)) (progn @@ -94,20 +103,20 @@ CHUNK is a part of the HTTP body." (goto-char (point-min)) (let* ((response (prog1 (plz--response) (widen))) (media-type (plz-media-type--of-response media-types response))) - (process-put process :plz-media-type media-type) + (setq-local plz-media-type--current media-type) (when-let (body (plz-response-body response)) (when (> (length body) 0) (delete-region body-start (point-max)) (set-marker (process-mark process) (point)) (plz-media-type-process media-type process response))) (setf (plz-response-body response) nil) - (process-put process :plz-media-type-response response)))))) + (setq-local plz-media-type--response response)))))) (when moving (goto-char (process-mark process))))))) ;; Content Type: application/octet-stream -(defclass plz-media-type:application/octet-stream (plz:media-type) +(defclass plz-media-type:application/octet-stream (plz-media-type) ((name :initform "application/octet-stream"))) (cl-defmethod plz-media-type-else ((media-type plz-media-type:application/octet-stream) error) @@ -119,49 +128,115 @@ CHUNK is a part of the HTTP body." (cl-defmethod plz-media-type-then ((media-type plz-media-type:application/octet-stream) response) "Transform the RESPONSE into a format suitable for MEDIA-TYPE." (ignore media-type) + (setf (plz-response-body response) (buffer-string)) response) (cl-defmethod plz-media-type-process ((media-type plz-media-type:application/octet-stream) process chunk) "Process the CHUNK according to MEDIA-TYPE using PROCESS." (ignore media-type) - (when (buffer-live-p (process-buffer process)) - (with-current-buffer (process-buffer process) - (let ((moving (= (point) (process-mark process)))) - (save-excursion - (goto-char (process-mark process)) - (insert (plz-response-body chunk)) - (set-marker (process-mark process) (point))) - (when moving - (goto-char (process-mark process))))))) + (save-excursion + (goto-char (process-mark process)) + (insert (plz-response-body chunk)) + (set-marker (process-mark process) (point)))) ;; Content Type: application/json (defclass plz-media-type:application/json (plz-media-type:application/octet-stream) - ((name :initform "application/json") - (array-type :initform 'array) - (false-object :initform :json-false) - (null-object :initform nil) - (object-type :initform 'alist))) + ((name + :initform "application/json") + (array-type + :documentation "Specifies which Lisp type is used to represent arrays. It can be +`array' (the default) or `list'." + :initarg :array-type + :initform 'array + :type symbol) + (false-object + :documentation "Specifies which object to use to represent a JSON false value. It +defaults to `:json-false'." + :initarg :false-object + :initform :json-false) + (null-object + :documentation "Specifies which object to use to represent a JSON null value. It +defaults to `nil`." + :initarg :null-object + :initform nil) + (object-type + :documentation "Specifies which Lisp type is used to represent objects. It can +be `hash-table', `alist' (the default) or `plist'." + :initarg :object-type + :initform 'alist + :type symbol))) + +(defun plz-media-type--parse-json-object (media-type) + "Parse the JSON object in the current buffer according to MEDIA-TYPE." + (with-slots (array-type false-object null-object object-type) media-type + (json-parse-buffer :array-type array-type + :false-object false-object + :null-object null-object + :object-type object-type)) ) (cl-defmethod plz-media-type-then ((media-type plz-media-type:application/json) response) "Transform the RESPONSE into a format suitable for MEDIA-TYPE." - (with-slots (array-type false-object null-object object-type) media-type - (setf (plz-response-body response) - (with-temp-buffer - (insert (plz-response-body response)) - (goto-char (point-min)) - (json-parse-buffer :array-type array-type - :false-object false-object - :null-object null-object - :object-type object-type))) - response)) + (setf (plz-response-body response) (plz-media-type--parse-json-object media-type)) + response) + +;; Content Type: application/json (array of objects) + +(defclass plz-media-type:application/json-array (plz-media-type:application/json) + ((handler + :documentation "A function that will be called for each object in the JSON array." + :initarg :handler + :type (or function symbol)))) + +(defun plz-media-type:application/json-array--parse-next (media-type) + "Parse a single line of the newline delimited JSON MEDIA-TYPE." + (cond ((looking-at "\\[") + (delete-char 1)) + ((looking-at "[ ,\n\r]") + (delete-char 1)) + ((looking-at "\\]") + (delete-char 1)) + ((not (eobp)) + (ignore-errors + (let ((begin (point))) + (prog1 (plz-media-type--parse-json-object media-type) + (delete-region begin (point)))))))) + +(defun plz-media-type:application/json-array--parse-stream (media-type) + "Parse all lines of the newline delimited JSON MEDIA-TYPE in the PROCESS buffer." + (with-slots (handler) media-type + (unless plz-media-type--position + (setq-local plz-media-type--position (point))) + (goto-char plz-media-type--position) + (let ((object (plz-media-type:application/json-array--parse-next media-type))) + (setq-local plz-media-type--position (point)) + (while object + (setq-local plz-media-type--position (point)) + (when (functionp handler) + (funcall handler object)) + (setq object (plz-media-type:application/json-array--parse-next media-type)))))) + +(cl-defmethod plz-media-type-process ((media-type plz-media-type:application/json-array) process chunk) + "Process the CHUNK according to MEDIA-TYPE using PROCESS." + (ignore media-type) + (cl-call-next-method media-type process chunk) + (plz-media-type:application/json-array--parse-stream media-type)) + +(cl-defmethod plz-media-type-then ((media-type plz-media-type:application/json-array) response) + "Transform the RESPONSE into a format suitable for MEDIA-TYPE." + (ignore media-type) + (plz-media-type:application/json-array--parse-stream media-type) + response) ;; Content Type: application/x-ndjson (defclass plz-media-type:application/x-ndjson (plz-media-type:application/json) ((name :initform "application/x-ndjson") - (handler :documentation "The handler that will be called for each JSON object in the response." - :initarg :handler))) + (handler + :documentation "A function that will be called for each line that contains a JSON object." + :initarg :handler + :initform nil + :type (or function null symbol)))) (defconst plz-media-type:application/x-ndjson--line-regexp (rx (* not-newline) (or "\r\n" "\n" "\r")) @@ -170,37 +245,30 @@ CHUNK is a part of the HTTP body." (defun plz-media-type:application/x-ndjson--parse-line (media-type) "Parse a single line of the newline delimited JSON MEDIA-TYPE." (when (looking-at plz-media-type:application/x-ndjson--line-regexp) - (when-let (line (delete-and-extract-region (match-beginning 0) (match-end 0))) - (with-slots (array-type false-object null-object object-type) media-type - (json-parse-string line - :array-type array-type - :false-object false-object - :null-object null-object - :object-type object-type))))) - -(defun plz-media-type:application/x-ndjson--parse-stream (media-type process) + (prog1 (plz-media-type--parse-json-object media-type) + (delete-region (match-beginning 0) (match-end 0))))) + +(defun plz-media-type:application/x-ndjson--parse-stream (media-type) "Parse all lines of the newline delimited JSON MEDIA-TYPE in the PROCESS buffer." (with-slots (handler) media-type - (goto-char (process-get process :plz-media-type:application/x-ndjson-position)) + (unless plz-media-type--position + (setq-local plz-media-type--position (point))) + (goto-char plz-media-type--position) (when-let (object (plz-media-type:application/x-ndjson--parse-line media-type)) (while object - (process-put process :plz-media-type:application/x-ndjson-position (point)) + (setq-local plz-media-type--position (point)) (when (functionp handler) (funcall handler object)) (setq object (plz-media-type:application/x-ndjson--parse-line media-type)))))) (cl-defmethod plz-media-type-process ((media-type plz-media-type:application/x-ndjson) process chunk) "Process the CHUNK according to MEDIA-TYPE using PROCESS." - (when (buffer-live-p (process-buffer process)) - (with-current-buffer (process-buffer process) - (unless (process-get process :plz-media-type:application/x-ndjson-position) - (process-put process :plz-media-type:application/x-ndjson-position (point))) - (cl-call-next-method media-type process chunk) - (plz-media-type:application/x-ndjson--parse-stream media-type process)))) + (cl-call-next-method media-type process chunk) + (plz-media-type:application/x-ndjson--parse-stream media-type)) (cl-defmethod plz-media-type-then ((media-type plz-media-type:application/x-ndjson) response) "Transform the RESPONSE into a format suitable for MEDIA-TYPE." - (plz-media-type:application/x-ndjson--parse-stream media-type (plz-response-process response)) + (plz-media-type:application/x-ndjson--parse-stream media-type) response) ;; Content Type: application/xml @@ -211,10 +279,7 @@ CHUNK is a part of the HTTP body." (cl-defmethod plz-media-type-then ((media-type plz-media-type:application/xml) response) "Transform the RESPONSE into a format suitable for MEDIA-TYPE." (with-slots (array-type false-object null-object object-type) media-type - (setf (plz-response-body response) - (with-temp-buffer - (insert (plz-response-body response)) - (libxml-parse-html-region))) + (setf (plz-response-body response) (libxml-parse-html-region)) response)) ;; Content Type: text/html @@ -224,12 +289,26 @@ CHUNK is a part of the HTTP body." (defvar plz-media-types `(("application/json" . ,(plz-media-type:application/json)) - ("application/octet-stream" . ,(plz-media-type:application/json)) + ("application/octet-stream" . ,(plz-media-type:application/octet-stream)) ("application/xml" . ,(plz-media-type:application/xml)) ("text/html" . ,(plz-media-type:text/html)) (t . ,(plz-media-type:application/octet-stream))) "Alist from media type to content type.") +(defun plz-media-type--handle-sync-error (media-types error) + "Handle the synchronous ERROR of type `plz-http-error' with MEDIA-TYPES." + (let* ((msg (cadr error)) + (plzerror (caddr error))) + (signal (car error) + (let ((response (plz-error-response plzerror))) + (if-let (media-type (plz-media-type--of-response media-types response)) + (list msg (with-temp-buffer + (when-let (body (plz-response-body response)) + (insert body) + (goto-char (point-min))) + (plz-media-type-else media-type plzerror))) + (cdr error)))))) + (cl-defun plz-media-type-request (method url @@ -333,43 +412,40 @@ not. (if-let (media-types (pcase as (`(media-types ,media-types) media-types))) - (let* ((plz-curl-default-args (cons "--no-buffer" plz-curl-default-args)) - (result (plz method url - :as 'response - :body body - :body-type body-type - :connect-timeout connect-timeout - :decode decode - :else (when (functionp else) - (lambda (object) - (let* ((media-type (plz-media-type--of-response media-types (plz-error-response object))) - (object (plz-media-type-else media-type object))) - (funcall else object)))) - :finally (when (functionp finally) - (lambda () (funcall finally))) - :headers headers - :noquery noquery - :process-filter (lambda (process chunk) - (plz-media-type-process-filter process media-types chunk)) - :timeout timeout - :then (cond - ((symbolp then) then) - ((functionp then) - (lambda (object) - (let* ((media-type (plz-media-type--of-response media-types object)) - (object (plz-media-type-then media-type object))) - (funcall then object)))))))) - ;; TODO: Handle sync event stream - (cond - ((processp result) - result) - ((plz-response-p result) - (let ((media-type (plz-media-type--of-response media-types result))) - (plz-media-type-then media-type result))) - ((plz-error-p result) - (let ((media-type (plz-media-type--of-response media-types (plz-error-response result)))) - (plz-media-type-else media-type result))) - (t result))) + (condition-case error + (let* ((plz-curl-default-args (cons "--no-buffer" plz-curl-default-args)) + (result (plz method url + :as 'buffer + :body body + :body-type body-type + :connect-timeout connect-timeout + :decode decode + :else (when (functionp else) + (lambda (error) + (funcall else (plz-media-type-else + plz-media-type--current + error)))) + :finally (when (functionp finally) + (lambda () (funcall finally))) + :headers headers + :noquery noquery + :process-filter (lambda (process chunk) + (plz-media-type-process-filter process media-types chunk)) + :timeout timeout + :then (cond + ((symbolp then) then) + ((functionp then) + (lambda (_) + (funcall then (plz-media-type-then + plz-media-type--current + plz-media-type--response)))))))) + (cond ((bufferp result) + (with-current-buffer result + (plz-media-type-then plz-media-type--current plz-media-type--response))) + ((processp result) + result) + (t (user-error "Unexpected response: %s" result)))) + (plz-error (plz-media-type--handle-sync-error media-types error))) (apply #'plz (append (list method url) rest)))) ;;;; Footer diff --git a/plz.el b/plz.el index 0739a20b04..69072063c7 100644 --- a/plz.el +++ b/plz.el @@ -110,7 +110,7 @@ ;;;; Structs (cl-defstruct plz-response - version status headers body process) + version status headers body) (cl-defstruct plz-error curl-error response message) @@ -439,7 +439,7 @@ NOQUERY is passed to `make-process', which see. (decode-coding-region (point) (point-max) coding-system))) (funcall then (current-buffer))))) ('response (lambda () - (funcall then (or (plz--response :decode-p decode :process process) + (funcall then (or (plz--response :decode-p decode) (make-plz-error :message (format "response is nil for buffer:%S buffer-string:%S" process-buffer (buffer-string))))))) ('file (lambda () @@ -771,7 +771,7 @@ argument passed to `plz--sentinel', which see." ;; Any other status code is considered unsuccessful ;; (for now, anyway). - (let ((err (make-plz-error :response (plz--response :process process)))) + (let ((err (make-plz-error :response (plz--response)))) (pcase-exhaustive (process-get process :plz-else) (`nil (process-put process :plz-result err)) ((and (pred functionp) fn) (funcall fn err))))))) @@ -837,13 +837,11 @@ Arguments are PROCESS and STATUS (ok, checkdoc?)." (or (re-search-forward "\r\n\r\n" nil t) (signal 'plz-http-error '("plz--response: End of redirect headers not found"))))) -(cl-defun plz--response (&key (decode-p t) process) +(cl-defun plz--response (&key (decode-p t)) "Return response structure for HTTP response in current buffer. When DECODE-P is non-nil, decode the response body automatically according to the apparent coding system. -PROCESS is the curl process object that made the request. - Assumes that point is at beginning of HTTP response." (save-excursion ;; Parse HTTP version and status code. @@ -862,8 +860,7 @@ Assumes that point is at beginning of HTTP response." :version http-version :status status-code :headers headers - :body (buffer-string) - :process process)))) + :body (buffer-string))))) (defun plz--coding-system (&optional headers) "Return coding system for HTTP response in current buffer.