branch: externals/llm
commit 83f05280fc840c6191b84cf8735f2d1b01d08875
Author: Roman Scherer <ro...@burningswell.com>
Commit: Roman Scherer <ro...@burningswell.com>

    Strip plz changes and add JSON array stream media type
    
    - This removes the process slot from the plz response struct which I
    added initially. We don't need it when using the :as 'buffer option
    when calling out to plz. We now only depend on one additional change
    in plz, which is setting the process filter via an option.
    
    - This adds the plz-media-type:application/json-array media type which
    can be used to stream a JSON array as used by Vertex/Gemini. It's a
    poor mans streaming JSON parser. You can register a handler that gets
    called for each object in the outer most JSON array of the
    response. You can use it in a similar way as the event stream handler.
    
    ```
    (let* ((api-key "my-key")
           (project "my-project"))
      (plz-media-type-request
        'post (format
               (concat "https://us-central1-aiplatform.googleapis.com";
                       "/v1/projects/%s/locations"
                       "/us-central1/publishers/google/models"
                       "/gemini-1.0-pro:streamGenerateContent")
               project)
        :as `(media-types (("application/json"
                            . ,(plz-media-type:application/json-array
                                :handler (lambda (object)
                                           (message "Object: %s" object))))))
        :body (json-encode
               `((contents .
                           [((role . "user")
                             (parts .
                                    [((text . "Hello"))]))])
                 (generation_config
                  (maxOutputTokens . 2048)
                  (temperature . 1)
                  (topP . 0.4))
                 (safetySettings .
                                 [((category . "HARM_CATEGORY_HATE_SPEECH")
                                   (threshold . "BLOCK_MEDIUM_AND_ABOVE"))
                                  ((category . 
"HARM_CATEGORY_DANGEROUS_CONTENT")
                                   (threshold . "BLOCK_MEDIUM_AND_ABOVE"))
                                  ((category . 
"HARM_CATEGORY_SEXUALLY_EXPLICIT")
                                   (threshold . "BLOCK_MEDIUM_AND_ABOVE"))
                                  ((category . "HARM_CATEGORY_HARASSMENT")
                                   (threshold . "BLOCK_MEDIUM_AND_ABOVE"))])))
        :headers `(("Authorization" . ,(format "Bearer %s" api-key))
                   ("Content-Type" . "application/json"))
        :then (lambda (response)
                (message "Done."))))
    ```
    
    See the tests in [1] for more examples.
    
    Unfortunatly with Gemini/Vertex wanting a streaming parser for
    application/json we need to distinguish now 2 cases:
    
    - Handle application/json in streaming way
    - Handle application/json in non-streaming way
    
    In order to use this, I would suggest to pass the media types via a
    :media-types option to the following functions:
    
    - llm-request-plz-event-stream
    - llm-request-plz-sync
    - llm-request-plz-async
    
    We could use the plz-media-types as the default, but for any media
    type that needs to register event handlers for a streaming media type,
    I think it's best if we wire the list of media types (and the
    callbacks) together in each provider.
    
    Wdyt?
    
    [1] 
https://github.com/r0man/plz.el/blob/plz-media-type/tests/test-plz-media-type.el#L227-L256
---
 plz-event-source.el |  26 +++---
 plz-media-type.el   | 262 +++++++++++++++++++++++++++++++++-------------------
 plz.el              |  13 +--
 3 files changed, 186 insertions(+), 115 deletions(-)

diff --git a/plz-event-source.el b/plz-event-source.el
index c72e7bb7fa..c98d6f86cb 100644
--- a/plz-event-source.el
+++ b/plz-event-source.el
@@ -403,16 +403,16 @@
 (defclass plz-media-type:text/event-stream 
(plz-media-type:application/octet-stream)
   ((name :initform "text/event-stream")
    (events :documentation "Association list from event type to handler."
-           :initarg :events)))
+           :initarg :events
+           :initform nil
+           :type list)))
 
-(defun plz-media-type:text/event-stream--event-source (response)
-  "Return the event source of the RESPONSE."
-  (process-get (plz-response-process response) :plz-event-source))
+(defvar-local plz-event-source--current nil
+  "The event source of the current buffer.")
 
 (cl-defmethod plz-media-type-else ((_ plz-media-type:text/event-stream) error)
   "Transform the ERROR into a format suitable for MEDIA-TYPE."
-  (let* ((response (plz-error-response error))
-         (source (plz-media-type:text/event-stream--event-source response))
+  (let* ((source plz-event-source--current)
          (event (plz-event-source-event :type "error" :data error)))
     (plz-event-source-close source)
     (plz-event-source-dispatch-event source event)
@@ -422,7 +422,7 @@
   "Process the CHUNK according to MEDIA-TYPE using PROCESS."
   (when (buffer-live-p (process-buffer process))
     (with-current-buffer (process-buffer process)
-      (unless (process-get process :plz-event-source)
+      (unless plz-event-source--current
         (let* ((response (make-plz-response
                           :status (plz-response-status chunk)
                           :headers (plz-response-headers chunk)))
@@ -444,15 +444,13 @@
                                                        (funcall handler source 
event))))
                                          (t pair))))
                                     (oref media-type events))))))
-          (process-put process :plz-event-source source)))
-      (plz-event-source-insert (process-get process :plz-event-source)
-                               (plz-response-body chunk)))))
+          (setq-local plz-event-source--current source)))
+      (plz-event-source-insert plz-event-source--current (plz-response-body 
chunk)))))
 
-(cl-defmethod plz-media-type-then ((_ plz-media-type:text/event-stream) 
response)
+(cl-defmethod plz-media-type-then ((media-type 
plz-media-type:text/event-stream) response)
   "Transform the RESPONSE into a format suitable for MEDIA-TYPE."
-  (let ((source (plz-media-type:text/event-stream--event-source response)))
-    (plz-event-source-close source)
-    response))
+  (plz-event-source-close plz-event-source--current)
+  (cl-call-next-method media-type response))
 
 (provide 'plz-event-source)
 ;;; plz-event-source.el ends here
diff --git a/plz-media-type.el b/plz-media-type.el
index 156bb75fd5..4b1f493e66 100644
--- a/plz-media-type.el
+++ b/plz-media-type.el
@@ -34,9 +34,9 @@
 (require 'eieio)
 (require 'plz)
 
-(defclass plz:media-type ()
+(defclass plz-media-type ()
   ((name
-    :documentation "The MIME Type of the handler."
+    :documentation "The name of the media type."
     :initarg :name
     :initform "application/octet-stream"
     :type string)))
@@ -67,6 +67,15 @@
   (let ((media-type (plz-media-type--content-type response)))
     (plz-media--type-find media-types media-type)))
 
+(defvar-local plz-media-type--current nil
+  "The media type of the process buffer.")
+
+(defvar-local plz-media-type--position nil
+  "The position in the process buffer.")
+
+(defvar-local plz-media-type--response nil
+  "The response of the process buffer.")
+
 (defun plz-media-type-process-filter (process media-types chunk)
   "The process filter that handles different content types.
 
@@ -79,8 +88,8 @@ CHUNK is a part of the HTTP body."
   (when (buffer-live-p (process-buffer process))
     (with-current-buffer (process-buffer process)
       (let ((moving (= (point) (process-mark process))))
-        (if-let (media-type (process-get process :plz-media-type))
-            (let ((response (process-get process :plz-media-type-response)))
+        (if-let (media-type plz-media-type--current)
+            (let ((response plz-media-type--response))
               (setf (plz-response-body response) chunk)
               (plz-media-type-process media-type process response))
           (progn
@@ -94,20 +103,20 @@ CHUNK is a part of the HTTP body."
                 (goto-char (point-min))
                 (let* ((response (prog1 (plz--response) (widen)))
                        (media-type (plz-media-type--of-response media-types 
response)))
-                  (process-put process :plz-media-type media-type)
+                  (setq-local plz-media-type--current media-type)
                   (when-let (body (plz-response-body response))
                     (when (> (length body) 0)
                       (delete-region body-start (point-max))
                       (set-marker (process-mark process) (point))
                       (plz-media-type-process media-type process response)))
                   (setf (plz-response-body response) nil)
-                  (process-put process :plz-media-type-response response))))))
+                  (setq-local plz-media-type--response response))))))
         (when moving
           (goto-char (process-mark process)))))))
 
 ;; Content Type: application/octet-stream
 
-(defclass plz-media-type:application/octet-stream (plz:media-type)
+(defclass plz-media-type:application/octet-stream (plz-media-type)
   ((name :initform "application/octet-stream")))
 
 (cl-defmethod plz-media-type-else ((media-type 
plz-media-type:application/octet-stream) error)
@@ -119,49 +128,115 @@ CHUNK is a part of the HTTP body."
 (cl-defmethod plz-media-type-then ((media-type 
plz-media-type:application/octet-stream) response)
   "Transform the RESPONSE into a format suitable for MEDIA-TYPE."
   (ignore media-type)
+  (setf (plz-response-body response) (buffer-string))
   response)
 
 (cl-defmethod plz-media-type-process ((media-type 
plz-media-type:application/octet-stream) process chunk)
   "Process the CHUNK according to MEDIA-TYPE using PROCESS."
   (ignore media-type)
-  (when (buffer-live-p (process-buffer process))
-    (with-current-buffer (process-buffer process)
-      (let ((moving (= (point) (process-mark process))))
-        (save-excursion
-          (goto-char (process-mark process))
-          (insert (plz-response-body chunk))
-          (set-marker (process-mark process) (point)))
-        (when moving
-          (goto-char (process-mark process)))))))
+  (save-excursion
+    (goto-char (process-mark process))
+    (insert (plz-response-body chunk))
+    (set-marker (process-mark process) (point))))
 
 ;; Content Type: application/json
 
 (defclass plz-media-type:application/json 
(plz-media-type:application/octet-stream)
-  ((name :initform "application/json")
-   (array-type :initform 'array)
-   (false-object :initform :json-false)
-   (null-object :initform nil)
-   (object-type :initform 'alist)))
+  ((name
+    :initform "application/json")
+   (array-type
+    :documentation "Specifies which Lisp type is used to represent arrays.  It 
can be
+`array' (the default) or `list'."
+    :initarg :array-type
+    :initform 'array
+    :type symbol)
+   (false-object
+    :documentation "Specifies which object to use to represent a JSON false 
value. It
+defaults to `:json-false'."
+    :initarg :false-object
+    :initform :json-false)
+   (null-object
+    :documentation "Specifies which object to use to represent a JSON null 
value.  It
+defaults to `nil`."
+    :initarg :null-object
+    :initform nil)
+   (object-type
+    :documentation "Specifies which Lisp type is used to represent objects.  
It can
+be `hash-table', `alist' (the default) or `plist'."
+    :initarg :object-type
+    :initform 'alist
+    :type symbol)))
+
+(defun plz-media-type--parse-json-object (media-type)
+  "Parse the JSON object in the current buffer according to MEDIA-TYPE."
+  (with-slots (array-type false-object null-object object-type) media-type
+    (json-parse-buffer :array-type array-type
+                       :false-object false-object
+                       :null-object null-object
+                       :object-type object-type)) )
 
 (cl-defmethod plz-media-type-then ((media-type 
plz-media-type:application/json) response)
   "Transform the RESPONSE into a format suitable for MEDIA-TYPE."
-  (with-slots (array-type false-object null-object object-type) media-type
-    (setf (plz-response-body response)
-          (with-temp-buffer
-            (insert (plz-response-body response))
-            (goto-char (point-min))
-            (json-parse-buffer :array-type array-type
-                               :false-object false-object
-                               :null-object null-object
-                               :object-type object-type)))
-    response))
+  (setf (plz-response-body response) (plz-media-type--parse-json-object 
media-type))
+  response)
+
+;; Content Type: application/json (array of objects)
+
+(defclass plz-media-type:application/json-array 
(plz-media-type:application/json)
+  ((handler
+    :documentation "A function that will be called for each object in the JSON 
array."
+    :initarg :handler
+    :type (or function symbol))))
+
+(defun plz-media-type:application/json-array--parse-next (media-type)
+  "Parse a single line of the newline delimited JSON MEDIA-TYPE."
+  (cond ((looking-at "\\[")
+         (delete-char 1))
+        ((looking-at "[ ,\n\r]")
+         (delete-char 1))
+        ((looking-at "\\]")
+         (delete-char 1))
+        ((not (eobp))
+         (ignore-errors
+           (let ((begin (point)))
+             (prog1 (plz-media-type--parse-json-object media-type)
+               (delete-region begin (point))))))))
+
+(defun plz-media-type:application/json-array--parse-stream (media-type)
+  "Parse all lines of the newline delimited JSON MEDIA-TYPE in the PROCESS 
buffer."
+  (with-slots (handler) media-type
+    (unless plz-media-type--position
+      (setq-local plz-media-type--position (point)))
+    (goto-char plz-media-type--position)
+    (let ((object (plz-media-type:application/json-array--parse-next 
media-type)))
+      (setq-local plz-media-type--position (point))
+      (while object
+        (setq-local plz-media-type--position (point))
+        (when (functionp handler)
+          (funcall handler object))
+        (setq object (plz-media-type:application/json-array--parse-next 
media-type))))))
+
+(cl-defmethod plz-media-type-process ((media-type 
plz-media-type:application/json-array) process chunk)
+  "Process the CHUNK according to MEDIA-TYPE using PROCESS."
+  (ignore media-type)
+  (cl-call-next-method media-type process chunk)
+  (plz-media-type:application/json-array--parse-stream media-type))
+
+(cl-defmethod plz-media-type-then ((media-type 
plz-media-type:application/json-array) response)
+  "Transform the RESPONSE into a format suitable for MEDIA-TYPE."
+  (ignore media-type)
+  (plz-media-type:application/json-array--parse-stream media-type)
+  response)
 
 ;; Content Type: application/x-ndjson
 
 (defclass plz-media-type:application/x-ndjson (plz-media-type:application/json)
   ((name :initform "application/x-ndjson")
-   (handler :documentation "The handler that will be called for each JSON 
object in the response."
-            :initarg :handler)))
+   (handler
+    :documentation "A function that will be called for each line that contains 
a JSON object."
+    :initarg :handler
+    :initform nil
+    :type (or function null symbol))))
 
 (defconst plz-media-type:application/x-ndjson--line-regexp
   (rx (* not-newline) (or "\r\n" "\n" "\r"))
@@ -170,37 +245,30 @@ CHUNK is a part of the HTTP body."
 (defun plz-media-type:application/x-ndjson--parse-line (media-type)
   "Parse a single line of the newline delimited JSON MEDIA-TYPE."
   (when (looking-at plz-media-type:application/x-ndjson--line-regexp)
-    (when-let (line (delete-and-extract-region (match-beginning 0) (match-end 
0)))
-      (with-slots (array-type false-object null-object object-type) media-type
-        (json-parse-string line
-                           :array-type array-type
-                           :false-object false-object
-                           :null-object null-object
-                           :object-type object-type)))))
-
-(defun plz-media-type:application/x-ndjson--parse-stream (media-type process)
+    (prog1 (plz-media-type--parse-json-object media-type)
+      (delete-region (match-beginning 0) (match-end 0)))))
+
+(defun plz-media-type:application/x-ndjson--parse-stream (media-type)
   "Parse all lines of the newline delimited JSON MEDIA-TYPE in the PROCESS 
buffer."
   (with-slots (handler) media-type
-    (goto-char (process-get process 
:plz-media-type:application/x-ndjson-position))
+    (unless plz-media-type--position
+      (setq-local plz-media-type--position (point)))
+    (goto-char plz-media-type--position)
     (when-let (object (plz-media-type:application/x-ndjson--parse-line 
media-type))
       (while object
-        (process-put process :plz-media-type:application/x-ndjson-position 
(point))
+        (setq-local plz-media-type--position (point))
         (when (functionp handler)
           (funcall handler object))
         (setq object (plz-media-type:application/x-ndjson--parse-line 
media-type))))))
 
 (cl-defmethod plz-media-type-process ((media-type 
plz-media-type:application/x-ndjson) process chunk)
   "Process the CHUNK according to MEDIA-TYPE using PROCESS."
-  (when (buffer-live-p (process-buffer process))
-    (with-current-buffer (process-buffer process)
-      (unless (process-get process 
:plz-media-type:application/x-ndjson-position)
-        (process-put process :plz-media-type:application/x-ndjson-position 
(point)))
-      (cl-call-next-method media-type process chunk)
-      (plz-media-type:application/x-ndjson--parse-stream media-type process))))
+  (cl-call-next-method media-type process chunk)
+  (plz-media-type:application/x-ndjson--parse-stream media-type))
 
 (cl-defmethod plz-media-type-then ((media-type 
plz-media-type:application/x-ndjson) response)
   "Transform the RESPONSE into a format suitable for MEDIA-TYPE."
-  (plz-media-type:application/x-ndjson--parse-stream media-type 
(plz-response-process response))
+  (plz-media-type:application/x-ndjson--parse-stream media-type)
   response)
 
 ;; Content Type: application/xml
@@ -211,10 +279,7 @@ CHUNK is a part of the HTTP body."
 (cl-defmethod plz-media-type-then ((media-type plz-media-type:application/xml) 
response)
   "Transform the RESPONSE into a format suitable for MEDIA-TYPE."
   (with-slots (array-type false-object null-object object-type) media-type
-    (setf (plz-response-body response)
-          (with-temp-buffer
-            (insert (plz-response-body response))
-            (libxml-parse-html-region)))
+    (setf (plz-response-body response) (libxml-parse-html-region))
     response))
 
 ;; Content Type: text/html
@@ -224,12 +289,26 @@ CHUNK is a part of the HTTP body."
 
 (defvar plz-media-types
   `(("application/json" . ,(plz-media-type:application/json))
-    ("application/octet-stream" . ,(plz-media-type:application/json))
+    ("application/octet-stream" . ,(plz-media-type:application/octet-stream))
     ("application/xml" . ,(plz-media-type:application/xml))
     ("text/html" . ,(plz-media-type:text/html))
     (t . ,(plz-media-type:application/octet-stream)))
   "Alist from media type to content type.")
 
+(defun plz-media-type--handle-sync-error (media-types error)
+  "Handle the synchronous ERROR of type `plz-http-error' with MEDIA-TYPES."
+  (let* ((msg (cadr error))
+         (plzerror (caddr error)))
+    (signal (car error)
+            (let ((response (plz-error-response plzerror)))
+              (if-let (media-type (plz-media-type--of-response media-types 
response))
+                  (list msg (with-temp-buffer
+                              (when-let (body (plz-response-body response))
+                                (insert body)
+                                (goto-char (point-min)))
+                              (plz-media-type-else media-type plzerror)))
+                (cdr error))))))
+
 (cl-defun plz-media-type-request
     (method
      url
@@ -333,43 +412,40 @@ not.
   (if-let (media-types (pcase as
                          (`(media-types ,media-types)
                           media-types)))
-      (let* ((plz-curl-default-args (cons "--no-buffer" plz-curl-default-args))
-             (result (plz method url
-                       :as 'response
-                       :body body
-                       :body-type body-type
-                       :connect-timeout connect-timeout
-                       :decode decode
-                       :else (when (functionp else)
-                               (lambda (object)
-                                 (let* ((media-type 
(plz-media-type--of-response media-types (plz-error-response object)))
-                                        (object (plz-media-type-else 
media-type object)))
-                                   (funcall else object))))
-                       :finally (when (functionp finally)
-                                  (lambda () (funcall finally)))
-                       :headers headers
-                       :noquery noquery
-                       :process-filter (lambda (process chunk)
-                                         (plz-media-type-process-filter 
process media-types chunk))
-                       :timeout timeout
-                       :then (cond
-                              ((symbolp then) then)
-                              ((functionp then)
-                               (lambda (object)
-                                 (let* ((media-type 
(plz-media-type--of-response media-types object))
-                                        (object (plz-media-type-then 
media-type object)))
-                                   (funcall then object))))))))
-        ;; TODO: Handle sync event stream
-        (cond
-         ((processp result)
-          result)
-         ((plz-response-p result)
-          (let ((media-type (plz-media-type--of-response media-types result)))
-            (plz-media-type-then media-type result)))
-         ((plz-error-p result)
-          (let ((media-type (plz-media-type--of-response media-types 
(plz-error-response result))))
-            (plz-media-type-else media-type result)))
-         (t result)))
+      (condition-case error
+          (let* ((plz-curl-default-args (cons "--no-buffer" 
plz-curl-default-args))
+                 (result (plz method url
+                           :as 'buffer
+                           :body body
+                           :body-type body-type
+                           :connect-timeout connect-timeout
+                           :decode decode
+                           :else (when (functionp else)
+                                   (lambda (error)
+                                     (funcall else (plz-media-type-else
+                                                    plz-media-type--current
+                                                    error))))
+                           :finally (when (functionp finally)
+                                      (lambda () (funcall finally)))
+                           :headers headers
+                           :noquery noquery
+                           :process-filter (lambda (process chunk)
+                                             (plz-media-type-process-filter 
process media-types chunk))
+                           :timeout timeout
+                           :then (cond
+                                  ((symbolp then) then)
+                                  ((functionp then)
+                                   (lambda (_)
+                                     (funcall then (plz-media-type-then
+                                                    plz-media-type--current
+                                                    
plz-media-type--response))))))))
+            (cond ((bufferp result)
+                   (with-current-buffer result
+                     (plz-media-type-then plz-media-type--current 
plz-media-type--response)))
+                  ((processp result)
+                   result)
+                  (t (user-error "Unexpected response: %s" result))))
+        (plz-error (plz-media-type--handle-sync-error media-types error)))
     (apply #'plz (append (list method url) rest))))
 
 ;;;; Footer
diff --git a/plz.el b/plz.el
index 0739a20b04..69072063c7 100644
--- a/plz.el
+++ b/plz.el
@@ -110,7 +110,7 @@
 ;;;; Structs
 
 (cl-defstruct plz-response
-  version status headers body process)
+  version status headers body)
 
 (cl-defstruct plz-error
   curl-error response message)
@@ -439,7 +439,7 @@ NOQUERY is passed to `make-process', which see.
                         (decode-coding-region (point) (point-max) 
coding-system)))
                     (funcall then (current-buffer)))))
        ('response (lambda ()
-                    (funcall then (or (plz--response :decode-p decode :process 
process)
+                    (funcall then (or (plz--response :decode-p decode)
                                       (make-plz-error :message (format 
"response is nil for buffer:%S  buffer-string:%S"
                                                                        
process-buffer (buffer-string)))))))
        ('file (lambda ()
@@ -771,7 +771,7 @@ argument passed to `plz--sentinel', which see."
 
               ;; Any other status code is considered unsuccessful
               ;; (for now, anyway).
-              (let ((err (make-plz-error :response (plz--response :process 
process))))
+              (let ((err (make-plz-error :response (plz--response))))
                 (pcase-exhaustive (process-get process :plz-else)
                   (`nil (process-put process :plz-result err))
                   ((and (pred functionp) fn) (funcall fn err)))))))
@@ -837,13 +837,11 @@ Arguments are PROCESS and STATUS (ok, checkdoc?)."
     (or (re-search-forward "\r\n\r\n" nil t)
         (signal 'plz-http-error '("plz--response: End of redirect headers not 
found")))))
 
-(cl-defun plz--response (&key (decode-p t) process)
+(cl-defun plz--response (&key (decode-p t))
   "Return response structure for HTTP response in current buffer.
 When DECODE-P is non-nil, decode the response body automatically
 according to the apparent coding system.
 
-PROCESS is the curl process object that made the request.
-
 Assumes that point is at beginning of HTTP response."
   (save-excursion
     ;; Parse HTTP version and status code.
@@ -862,8 +860,7 @@ Assumes that point is at beginning of HTTP response."
        :version http-version
        :status status-code
        :headers headers
-       :body (buffer-string)
-       :process process))))
+       :body (buffer-string)))))
 
 (defun plz--coding-system (&optional headers)
   "Return coding system for HTTP response in current buffer.

Reply via email to