(ns lowly.minions
"Lowly minions get your work done in an obvious way."
(:refer-clojure :exclude [future send-off])
(:require [com.mefesto.wabbitmq :as wabbit]
[clojure.tools.logging :as log]
[leiningen.util.ns :as ns]) TODO (:import (java.util UUID)
(java.lang.management ManagementFactory)))
(def ^{:doc "Namespace in which minions work." :private true} context
(binding [*ns* (create-ns 'lowly.minions.context)] (refer-clojure) *ns*))
(def ^{:doc "Message being evaled by worker." :dynamic true} *current-message*)
(defn ^{:dynamic true} *exception-handler*
"Default exception handler simply logs. Rebind to perform your own recovery."
[e msg]
(log/warn e "Minion ran into trouble:" (String. (:body msg))))
(defn ^{:internal true :doc "Public for macro-expansion only!"} init [config]
(try (wabbit/exchange-declare (:exchange config "lowly.minions")
(:exchange-type config "direct")
(:exchange-durable config true)
(:exchange-auto-delete config false))
(wabbit/queue-declare (:queue config "lowly.minions.work")
(:durable config true))
(catch Exception e
(log/error e "Couldn't declare exchange/queue."))))
(alter-var-root #'init memoize)
(def ^{:dynamic true} *config* nil)
(defmacro with-minions [config & body]
`(if (or (and *config* (:implicit ~config))
(= *config* ~config)) (do ~@body)
(binding [*config* ~config]
(wabbit/with-broker ~config
(wabbit/with-channel ~config
(init ~config)
(wabbit/with-exchange (:exchange ~config)
~@body))))))
(defn send-off
"Execute a form on a minion node."
([form] (send-off form {}))
([form config]
(with-minions (merge {:implicit true} config)
(log/trace "Published" (pr-str form) (:key config "lowly.minions.work"))
(wabbit/publish (:key config "lowly.minions.work")
(.getBytes (pr-str form))))))
(defn broadcast
"Like send-off, but the form runs on all minion nodes."
([form] (broadcast form {}))
([form config]
(send-off form (merge {:exchange "lowly.minions.broadcast"
:exchange-type "fanout"
:key "lowly.minions.broadcast"} config))))
(defmacro future
"Run body on a minion node and return a result upon deref."
[& body]
`(let [reply-queue# (format "lowly.minions.reply.%s" (UUID/randomUUID))]
(clojure.core/future
(with-minions (merge {:implict true} *config*)
(wabbit/queue-declare reply-queue# false true true)
(send-off (list `wabbit/publish reply-queue#
'(.getBytes (pr-str (do ~@body)))))
(wabbit/with-queue reply-queue#
(-> (wabbit/consuming-seq true) first :body String. read-string))))))
(defn register
"Register var to be available for minions to run."
[var]
(binding [*ns* context]
(refer (.getName (:ns (meta var))) :only [(:name (meta var))])))
(defn auto-register
"Register all vars with :minions metadata in namespaces starting with prefix.
This requires the namespaces, so it could cause code to be loaded. prefix
defaults to the empty string."
([prefix]
(doseq [namespace (ns/namespaces-matching prefix)]
(try (with-out-str (require namespace))
(doseq [[_ var] (ns-publics namespace)
:when (:minions (meta var))]
(register var))
(catch Throwable _))))
([] (auto-register "")))
(defn- consume [{:keys [body envelope] :as msg}]
(binding [*ns* context,*current-message* msg]
TODO (log/trace "Minion received message:" (String. body))
(eval (read-string (String. body))))
(wabbit/ack (:delivery-tag envelope)))
(defn work
"Wait for work and eval it continually."
([config]
(when (:auto-register config true)
(auto-register (:auto-register config "")))
(with-minions config
(wabbit/with-queue (:queue config "lowly.minions.work")
(log/trace "Consuming on" (:queue config "lowly.minions.work"))
(doseq [msg (wabbit/consuming-seq)]
(try (consume msg)
(catch Exception e
(*exception-handler* e msg)))))))
([] (work {:implicit true})))
(def pid (-> (ManagementFactory/getRuntimeMXBean) .getName (.split "@") first))
(def broadcast-queue-name
(format "lowly.minions.broadcast.%s.%s"
(.getHostName (java.net.InetAddress/getLocalHost)) pid))
(defn work-on-broadcast
"Wait for work on the broadcast queue and eval it continually."
([config]
(work (merge {:exchange "lowly.minions.broadcast"
:exchange-type "fanout"
:queue broadcast-queue-name} config)))
([] (work-on-broadcast {:implicit true})))