I'm writing a an ETL process to read event level data from a product
database, transform / aggregate it and write to to an analytics data
warehouse. I'm using clojure's core.async library to separate these process
into concurrently executing components. Here's what the main part of my
code looks like right now
(ns data-staging.main
(:require [clojure.core.async :as async])
(:use [clojure.core.match :only (match)]
[data-staging.map-vecs]
[data-staging.tables])
(:gen-class))
(def submissions (make-table "Submission" "Valid"))
(def photos (make-table "Photo"))
(def videos (make-table "Video"))
(def votes (make-table "Votes"))
;; define channels used for sequential data processing
(def chan-in (async/chan 100))
(def chan-out (async/chan 100))
(defn write-thread [table]
"infinitely loops between reading subsequent 10000 rows from
table and ouputting a vector of the rows(maps)
into 'chan-in'"
(while true
(let [next-rows (get-rows table)]
(async/>!! chan-in next-rows)
(set-max table (:max-id (last next-rows))))))
(defn aggregator []
"takes output from 'chan-in' and aggregates it by coupon_id, date.
then adds / drops any fields that are needed / not needed and inputs
into 'chan-out'"
(while true
(->>
(async/<!! chan-in)
aggregate
(async/>!! chan-out))))
(defn read-thread []
"reads data from chan out and interts into Analytics DB"
(while true
(upsert (async/<!! chan-out))))
(defn -main []
(async/thread (write-thread submissions))
(async/thread (write-thread photos))
(async/thread (write-thread videos))
(async/thread-call aggregator)
(async/thread-call read-thread))
As you can see, I'm putting each os component on to its own thread and
using the blocking >!! call on the channels. It feels like using the
non-blocking >! calls along with go routines might be better for this use
case, especially for the database reads which spend most of their time
performing i/o and waiting for new rows in the product db. Is this the
case, and if so, what would be the best way to implement it? I'm a little
unclear on all the tradeoffs between the two methods and on exactly how to
effectively use go routines. Also any other suggestions on how to improve
the overall architecture would be much appreciated!
--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.