Here:
(ns dda.test)
(def test-infinite-lazy-seq (repeatedly
(fn [] {:id (rand-int 2)
:val (rand-int 10)})))
(def test-finite-seq [{:id 1 :val 1}
{:id 1 :val 2}
{:id 3 :val 1}])
(defn parallel-per
[k seqf ls]
(pmap #(map seqf %) (vals (group-by #(k %) ls))))
(defn get-next [x]
"Your code to call Kinesis for the next x item would be here."
(take x test-infinite-lazy-seq))
(def processed (atom []))
(dotimes [_ 3] ; This would be a doseq instead, or whatever you need it to
be
(swap! processed
#(concat % (parallel-per :id
(fn [m] (update m :val inc))
(get-next 20)))))
(parallel-per :id
(fn [m] (update m :val inc))
test-finite-seq)
This is what you would do if you wanted to "chunk" it. You'd just use
group-by instead of partition-by. The difference is that parallel-per would
lose the ability to process infinite sequences, as it is now mostly eager,
because group-by is eager. So you'd have to call it in some loop where each
time to pass it the next chunk to parallel-per process.
On Tuesday, 20 June 2017 19:28:04 UTC-7, Didier wrote:
>
> Do you want something like this?
>
> (ns dda.test)
>
> (def test-infinite-lazy-seq (repeatedly
> (fn [] {:id (rand-int 2)
> :val (rand-int 10)})))
>
> (def test-finite-seq [{:id 1 :val 1}
> {:id 1 :val 2}
> {:id 3 :val 1}])
>
> (defn parallel-per
> [k seqf ls]
> (pmap #(map seqf %) (partition-by #(k %) ls)))
>
> (take 10 (parallel-per :id
> (fn [m] (update m :val inc))
> test-infinite-lazy-seq))
>
> (parallel-per :id
> (fn [m] (update m :val inc))
> test-finite-seq)
>
>
> It handles your simple example, and can also handle infinite sequences
> lazily, since I assumed your Kinesis stream would be infinite and you want
> to process things as they come through.
>
> Now this only parallelize groups that come through back to back. It is not
> possible to do a group by ":id" on an infinite sequence, so the only thing
> you could do better then this would be to chunk. So you could take in batch
> of 100 from the stream, then group-by on it, and parallelize each groups. I
> can try to write a solution for that too if you want.
>
>
> On Tuesday, 20 June 2017 11:57:59 UTC-7, Tom Connors wrote:
>>
>> Great, I'll watch that video. Thanks again.
>>
>
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.