Thanks for the suggestion, Didier, but I was unable to find a way to make
pmap work for my use case. For those interested, here's what I came up
with, then some questions:
(defn parallel-per
"Handle records from input-chan in parallel, but records with matching
`splitter` return values serially."
[splitter handler input-chan]
(let [blockers (atom {}) ;; map of group-key to [chan num-remaining]
status-chan (async/chan)]
(async/go-loop []
(let [[val port] (async/alts! [input-chan status-chan])]
(if (= port input-chan)
(if (some? val)
(let [group-key (splitter val)]
(if-let [blocker (get @blockers group-key)]
(let [[blocker-chan ^long num-remaining] blocker
next-blocker-chan (async/chan)]
(swap! blockers assoc group-key [next-blocker-chan (inc
num-remaining)])
(async/go
(async/<! blocker-chan)
(handler val)
(async/put! status-chan group-key)
(async/close! next-blocker-chan))
(recur))
(let [blocker-chan (async/chan)]
(swap! blockers assoc group-key [blocker-chan 1])
(async/go
(handler val)
(async/put! status-chan group-key)
(async/close! blocker-chan))
(recur))))
(async/close! status-chan))
(let [group-key val
[_ ^long num-remaining] (get @blockers group-key)]
(if (> num-remaining 1)
(do
(swap! blockers update-in [group-key 1] dec)
(recur))
(do
(swap! blockers dissoc group-key)
(recur)))))))
nil))
Does anything in here look bad? Is there a way to gracefully handle
input-chan closing without using loop/recur? I'm using a new channel for
every new record to block the next record with the same `splitter` return
value - my first approach used one channel per distinct `splitter` value,
but I saw some results printed out of order. Does this mean that the order
of takes from <! is not deterministic, or that I have/had a bug?
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.