branch: elpa/el-job
commit cd57ff919e5d5246fc4cb654c685e8055d66cd14
Author: Martin Edström <meedstro...@gmail.com>
Commit: Martin Edström <meedstro...@gmail.com>

    Use a poll timer rather than after-change-functions
---
 el-job.el | 209 +++++++++++++++++++++++++++++++++-----------------------------
 1 file changed, 111 insertions(+), 98 deletions(-)

diff --git a/el-job.el b/el-job.el
index b1c4d2c28f..028b33b012 100644
--- a/el-job.el
+++ b/el-job.el
@@ -36,6 +36,8 @@
 
 ;;; Code:
 
+;; TODO: Maybe get rid of anonymous jobs
+
 (require 'cl-lib)
 (require 'subr-x) ;; for emacs 28
 (require 'compat)
@@ -239,6 +241,7 @@ being saddled with a huge item in addition to the average 
workload."
                     (push item items)))))))
         (if (length= sublists 0)
             (progn
+              ;; Degrade gracefully
               (fset 'el-job--split-optimally #'el-job--split-evenly)
               (error "el-job: Unexpected code path, report appreciated! Data: 
%S"
                      (list 'n-cores n-cores
@@ -269,10 +272,6 @@ See subroutine `el-job-child--zip' for details."
 
 ;;; Main logic
 
-(defvar el-job--machine-cores nil
-  "Max amount of processes to spawn for one job.
-Usually the number of logical cores on your machine minus 1.")
-
 (defvar el-jobs (make-hash-table :test #'eq)
   "Table of all el-job objects.")
 
@@ -300,8 +299,10 @@ with one character of your choosing, such as a dot."
   (ready nil :documentation "Processes ready for input.")
   (busy nil :documentation "Processes that have not yet returned output.")
   stderr
+  ;; Not an interesting timestamp, but list must start with something
+  ;; for `plist-put' to have an effect.
   (timestamps (list :initial-job-creation (current-time)))
-  (timeout (timer-create))
+  (poll-timer (timer-create))
   finish-times
   spawn-args
   (past-elapsed (make-hash-table :test #'equal))
@@ -311,16 +312,13 @@ with one character of your choosing, such as a dot."
   merged-results)
 
 ;;;###autoload
-(cl-defun el-job-launch ( &rest deprecated-args
-                          &key
-                          id
-                          if-busy
-                          load-features
-                          inject-vars
-                          inputs
-                          funcall-per-input
-                          callback
-                          &allow-other-keys )
+(cl-defun el-job-launch (&key id
+                              (if-busy 'wait)
+                              load-features
+                              inject-vars
+                              inputs
+                              funcall-per-input
+                              callback)
   "Run FUNCALL-PER-INPUT in one or more headless Elisp processes.
 Then merge the return values \(lists of N lists) into one list
 \(of N lists) and pass it to CALLBACK.
@@ -384,8 +382,7 @@ which you can get from this form:
     \(el-job:timestamps JOB)
 
 
-ID identifies this job, and is a symbol, a keyword, or an integer from
--536,870,911 to 536,870,911, i.e. something suitable for `eq'.
+ID is a symbol identifying this job.
 A non-nil ID has several purposes:
 
 - Prevent launching the same job twice, if the last invocation is not
@@ -411,28 +408,27 @@ still at work.  IF-BUSY may take on one of three symbols:
                (functionp funcall-per-input))
     (error "Argument FUNCALL-PER-INPUT must be a symbol with a function 
definition"))
   (when callback
-    (unless (and (symbolp callback) (functionp callback))
+    (unless (and (symbolp callback)
+                 (functionp callback))
       (error "Argument CALLBACK must be a symbol with a function definition")))
   (unless (proper-list-p load-features)
-    (error "el-job-launch: Argument LOAD-FEATURES must be a list"))
-  (setq if-busy (or if-busy 'wait))
-  (unless el-job--machine-cores
-    (setq el-job--machine-cores (max 1 (1- (num-processors)))))
+    (error "Argument LOAD-FEATURES must be a list"))
   (if (null id)
-      (let ((anonymous-job
-             (el-job--make :cores-to-use el-job--machine-cores
-                           :callback callback
-                           :queued-inputs
-                           (if (functionp inputs) (funcall inputs) inputs))))
+      (let* ((inputs (if (functionp inputs) (funcall inputs) inputs))
+             (anonymous-job
+              (el-job--make :cores-to-use (min (length inputs)
+                                               (max 1 (1- (num-processors))))
+                            :callback callback
+                            :queued-inputs inputs)))
         (el-job--spawn-processes anonymous-job
                                  load-features
                                  inject-vars
                                  funcall-per-input)
-        (el-job--exec anonymous-job))
+        (el-job--exec-pending-workload anonymous-job))
     (let ((job (or (gethash id el-jobs)
                    (puthash id (el-job--make :id id) el-jobs)))
-          (respawn nil)
-          (exec nil))
+          (do-respawn nil)
+          (do-exec nil))
       (el-job--with job ( .queued-inputs .busy .ready .cores-to-use
                           .spawn-args .callback .timestamps )
         (unless (and .busy (eq if-busy 'noop))
@@ -442,33 +438,39 @@ still at work.  IF-BUSY may take on one of three symbols:
             (setq inputs (funcall inputs)))
           (if .busy
               (pcase if-busy
-                ('takeover (setq respawn t)
-                           (setq exec t)
+                ('takeover (setq do-respawn t)
+                           (setq do-exec t)
                            (setf .queued-inputs inputs))
                 ('wait (setf .queued-inputs (append inputs .queued-inputs))))
             (setf .queued-inputs inputs)
-            (setq exec t))
-          (when exec
-            (when (< .cores-to-use el-job--machine-cores)
-              (setf .cores-to-use
-                    (min el-job--machine-cores
-                         (max .cores-to-use (length .queued-inputs)))))
+            (setq do-exec t))
+          (when do-exec
+            (setf .callback callback)
+            (setf .cores-to-use
+                  ;; FIXME: What if length of inputs has shrunk?
+                  ;; what happens when it list of 3 inputs is split by 7 cores?
+                  ;; If we decide we dont need to remember the N of cores from 
before,
+                  ;; taking the N of ready processes as good so long as it's 
not less than inputs,
+                  ;; then we could remove the .cores-to-use field.
+                  (min (max .cores-to-use (length .queued-inputs))
+                       (max 1 (1- (num-processors)))))
             (unless (and (= .cores-to-use (+ (length .busy) (length .ready)))
                          (seq-every-p #'process-live-p .ready)
-                         (seq-every-p #'process-live-p .busy))
-              (setq respawn t))
-            (let ((new-spawn-args
-                   (list job load-features inject-vars funcall-per-input)))
+                         (seq-every-p #'process-live-p .busy)) ; uhh
+              (setq do-respawn t))
+            (let ((new-spawn-args (list job
+                                        load-features
+                                        inject-vars
+                                        funcall-per-input)))
               (unless (eq (sxhash (cdr .spawn-args))
                           (sxhash (cdr new-spawn-args)))
                 (setf .spawn-args new-spawn-args)
                 (el-job--dbg 2 "New arguments, resetting processes for %s" id)
-                (setq respawn t)))
-            (setf .callback callback)
-            (when respawn
+                (setq do-respawn t)))
+            (when do-respawn
               (el-job--disable job)
               (apply #'el-job--spawn-processes .spawn-args))
-            (el-job--exec job)
+            (el-job--exec-pending-workload job)
             t))))))
 
 (defvar-local el-job-here nil)
@@ -496,7 +498,7 @@ For the rest of the arguments, see `el-job-launch'."
              "--batch"
              "--load" (el-job--ensure-compiled-lib 'el-job-child)
              "--eval" (format "(el-job-child--work #'%S)" funcall-per-input)))
-           ;; Ensure the working directory is not remote (messes things up)
+           ;; Ensure the working directory is not remote (it messes things up)
            (default-directory invocation-directory)
            (ident (or .id (number-to-string (abs (sxhash .spawn-args))))))
       (setf .stderr
@@ -520,20 +522,19 @@ For the rest of the arguments, see `el-job-launch'."
                          :sentinel #'ignore)))
               (when (string-suffix-p ">" (process-name proc))
                 (el-job--dbg 1 "Unintended duplicate process id for %s" proc))
-              (process-send-string proc vars)
-              (process-send-string proc "\n")
-              (process-send-string proc libs)
-              (process-send-string proc "\n")
               (with-current-buffer (process-buffer proc)
                 (setq-local el-job-here job)
-                (add-hook 'after-change-functions #'el-job--check-done nil t))
+                (process-send-string proc vars)
+                (process-send-string proc "\n")
+                (process-send-string proc libs)
+                (process-send-string proc "\n"))
               (push proc .ready)))
         ;; https://github.com/meedstrom/org-node/issues/75
         (( file-error )
          (el-job--disable job)
          (el-job--dbg 1 "el-job: Terminated job because of: %S" err))))))
 
-(defun el-job--exec (job)
+(defun el-job--exec-pending-workload (job)
   "Split the queued inputs in JOB and pass to all children.
 
 This puts them to work.  Each successful child will print output
@@ -541,11 +542,15 @@ This puts them to work.  Each successful child will print 
output
 should trigger `el-job--handle-output'."
   (el-job--with job
       ( .ready .busy .input-sets .result-sets .queued-inputs .cores-to-use
-        .past-elapsed .timestamps .finish-times .id .stderr .timeout )
-    (cancel-timer .timeout)
+        .past-elapsed .timestamps .finish-times .id .stderr .poll-timer )
+    (cancel-timer .poll-timer)
+    (setf .input-sets nil)
     (setf .result-sets nil)
     (setf .finish-times nil)
-    (let ((splits (el-job--split-optimally .queued-inputs .cores-to-use 
.past-elapsed)))
+    (let ((splits (el-job--split-optimally .queued-inputs
+                                           .cores-to-use
+                                           .past-elapsed))
+          busy-bufs)
       (unless (length< splits (1+ (length .ready)))
         (error "Items split in %d lists, but only %d ready processes"
                (length splits) (length .ready)))
@@ -557,46 +562,56 @@ should trigger `el-job--handle-output'."
             items proc)
         (while splits
           (setq items (pop splits))
-          (cl-assert .ready)
           (setq proc (pop .ready))
           (push proc .busy)
+          (push (process-buffer proc) busy-bufs)
           (setf (alist-get proc .input-sets) items)
           (with-current-buffer (process-buffer proc)
             (erase-buffer)
-            (remove-hook 'after-change-functions #'el-job--check-done t)
             (process-send-string proc (prin1-to-string items))
-            (process-send-string proc "\n")
-            (add-hook 'after-change-functions #'el-job--check-done nil t)))))
-    (setf .queued-inputs nil)
-    (plist-put .timestamps :work-begun (current-time))
-    (setf .timeout (run-with-timer 30 nil #'el-job--timeout .stderr))))
-
-(defun el-job--timeout (stderr-buf)
-  "Disable job corresponding to STDERR-BUF, and print that it timed out.
-If the job was idle, just reap the processes and print nothing."
-  (when-let* ((buf (get-buffer stderr-buf))
-              (job (buffer-local-value 'el-job-here buf)))
-    (let* ((was-busy (el-job:busy job))
-           (desc (or (el-job:id job)
-                     (format "once-off job that calls %S"
-                             (car (last (el-job:spawn-args job)))))))
-      (el-job--disable job)
-      (if was-busy
-          (message "el-job: Timed out, was busy for 30+ seconds: %s" desc)
-        (el-job--dbg 2 "Reaped idle processes for %s" desc)))))
-
-;; REVIEW: We use `process-send-string' to send a \n when sending more input
-;;         (in `el-job--exec').
-;;         Can that cause a bug combined with this?
-;;         Could workaround by using a NUL byte: 0 instead of ?\n.
-;;         In el-job-child.el, it'll have to use `prin1' rather than `print.'
-;;         Or, we can just remove the change-hook until after we sent the
-;;         aforementioned \n.
-(defun el-job--check-done (&rest _)
-  "Handle output in current buffer if it appears complete.
-Can be called in a process buffer at any time."
-  (if (eq (char-before) ?\n)
-      (el-job--handle-output)))
+            (process-send-string proc "\n"))))
+      (setf .queued-inputs nil)
+      (plist-put .timestamps :work-begun (current-time))
+      (setf .poll-timer (run-with-timer .02 nil #'el-job--poll 1 busy-bufs)))))
+
+;; Polling: simplistic but reliable.
+
+;; Had the clever idea to add a hook to `after-change-functions' in each
+;; process buffer to simply check (eq (char-before) ?\n) as output came in.
+
+;; Perf was good on my machine...on Emacs 30, bad on 29.  And it just seems
+;; like the kinda design that invites wild variance from machine to machine.
+
+(defun el-job--poll (n bufs)
+  (let (busy-bufs)
+    (save-current-buffer
+      (dolist (buf bufs)
+        (if (not (buffer-live-p buf))
+            (el-job--dbg 2 "Dead process buffer (this may be normal)")
+          (set-buffer buf)
+          (if (eq (char-before) ?\n)
+              (el-job--handle-output)
+            (push buf busy-bufs))))
+
+      ;; We do a chain of timers that successively ups the delay.
+      ;; To see what the delays would be, eval:
+      ;; (--map (/ (float it) (ash 1 5)) (-iterate '1+ 1 42))
+
+      ;; And to see the cumulative sums:
+      ;; (-reductions '+ (--map (/ (float it) (ash 1 5)) (-iterate '1+ 1 42)))
+      ;; As you can see, we do 7 polls inside the first second,
+      ;; but spread the last 7 polls between T-minus-20s and T-minus-30s.
+      (if (and busy-bufs (<= n 42))
+          (setf (el-job:poll-timer el-job-here)
+                (run-with-timer
+                 (/ (float n) (ash 1 5)) nil #'el-job--poll (1+ n) busy-bufs))
+        (let ((desc (or (el-job:id el-job-here)
+                        (format "once-off job that calls %S"
+                                (car (last (el-job:spawn-args 
el-job-here)))))))
+          (el-job--disable el-job-here)
+          (if busy-bufs
+              (message "el-job: Timed out, was busy for 30+ seconds: %s" desc)
+            (el-job--dbg 2 "Reaped idle processes for %s" desc)))))))
 
 (defun el-job--handle-output ()
   "Handle output in current buffer.
@@ -625,8 +640,7 @@ more input in the queue."
     (when results
       (el-job--with job
           ( .busy .ready .input-sets .past-elapsed .result-sets .queued-inputs
-            .timestamps .id .finish-times
-            .timeout .callback .merged-results )
+            .timestamps .id .finish-times .callback .merged-results )
         (push finish-time .finish-times)
         ;; Record time spent by FUNCALL-PER-INPUT on each item in INPUTS,
         ;; for a better `el-job--split-optimally' in the future.
@@ -649,18 +663,17 @@ more input in the queue."
             (funcall .callback .merged-results job))
           (if .id
               (when .queued-inputs
-                (el-job--exec job))
-            ;; Cleanup process buffers of anonymous job
-            ;; TODO: actually just let timeout do it...
+                (el-job--exec-pending-workload job))
+            ;; Always clean up process buffers of anonymous job
             (el-job--disable job)))))))
 
 (defun el-job--disable (job)
-  "Kill processes in JOB and associated process buffers.
+  "Kill processes in JOB and their process buffers.
 
 This does not deregister the job ID.  That means the next launch with
 same ID still has the benchmarks table and possibly queued input."
-  (el-job--with job (.id .timeout .busy .ready .stderr)
-    (cancel-timer .timeout)
+  (el-job--with job (.id .busy .ready .stderr .poll-timer)
+    (cancel-timer .poll-timer)
     (let ((preserve (and .id (> el-job--debug-level 0))))
       (dolist (proc (append .busy .ready))
         (let ((buf (process-buffer proc)))

Reply via email to