Hi Adrian,
What is exactly the issue that you're facing?
I did my own version and it seems to be working fine.
Please, take a look and I hope it helps.
(defn process-file [ch file]
(async/thread
(with-open [input (io/reader file)]
(doseq [line (line-seq input)]
(async/>!! ch line)))))
(defn parse [line]
(str "Parsed: " line)) ; change it to do whatever you want
(defn mapping [ch]
(async/map parse [ch]))
(defn start []
(let [events (mapping
(async/chan))]
(process-file events "10_events.json")
(async/go-loop []
(let [v (async/<! events)]
(println v)
(recur)))))
About your approach. For me, it seems a legitimate usage for core.async.
Please, send us your impressions once you finish.
Cheers,
Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu:
>
> Hi,
>
> I've played around with core.async a bit but now I'm trying to use it for
> a real project and I'm running into a problem getting data off a file and
> into a channel on the JVM (i.e. as opposed to ClojureScript)
>
> I have around 1GB of data sitting in a file. Each line of the file
> contains a separate JSON document. There are different types of document
> in the file and I would like use core.async to setup a pipeline of
> concurrent operations as follows so I can start processing the data before
> I've finished reading the file.
>
> 1. Stream the raw data out of the file one line at a time, parse it as
> JSON and write each line to channel (1)
> 2. Read channel (1) and divide the messages up by type and write them to
> new channels (2..n)
> 3. Read channels (2..n) and apply business logic as appropriate
>
> I'd like the initial read to run in it's own thread because it will be IO
> blocking. The others can run in core.async's thread pool
>
> I'm running into problems getting channels (1) and (2) to talk to one
> another. Here's my initial spike and I would expect it to write the 10
> lines of json from the example file to stdout.
>
> (defn file-to-chan [ch file]
> (do
> (async/thread
> (with-open [rdr (io/reader file)]
> (doseq [line (line-seq rdr)]
> (>!! ch line))))
> ch))
>
> (defn parse-line [s]
> (json/parse-string s (comp keyword str/lower-case)))
>
> (def events (chan 1 (map parse-line)))
>
> (go
> (while true
> (println (<! events))))
>
> (file-to-chan events "10_events.json")
>
> I have a few questions...
>
> * Can anyone help me understand what's going wrong? (I'm sure it's
> something silly, but I'm going cross eyed looking at it)
> * It's effectively a batch process. Is this an appropriate use case for
> core.async?
> * If so, am I on the right track or is there a better way to approach this?
>
> Many Thanks
>
> Adrian
>
>
>
>
>
>
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.