(ns lowly.minions
  "Lowly minions get your work done in an obvious way."
  (:refer-clojure :exclude [future send-off])
  (:require [com.mefesto.wabbitmq :as wabbit]
            [clojure.tools.logging :as log]
            [leiningen.util.ns :as ns]) ; TODO: spin off into lib
  (:import (java.util UUID)
           (java.lang.management ManagementFactory)))

(def ^{:doc "Namespace in which minions work." :private true} context
  (binding [*ns* (create-ns 'lowly.minions.context)] (refer-clojure) *ns*))

(def ^{:doc "Message being evaled by worker." :dynamic true} *current-message*)

(defn ^{:dynamic true} *exception-handler*
  "Default exception handler simply logs. Rebind to perform your own recovery."
  [e msg]
  (log/warn e "Minion ran into trouble:" (String. (:body msg))))

(defn ^{:internal true :doc "Public for macro-expansion only!"} init [config]
  (try (wabbit/exchange-declare (:exchange config "lowly.minions")
                                (:exchange-type config "direct")
                                (:exchange-durable config true)
                                (:exchange-auto-delete config false))
       (wabbit/queue-declare (:queue config "lowly.minions.work")
                             (:durable config true))
       (catch Exception e
         (log/error e "Couldn't declare exchange/queue."))))

(alter-var-root #'init memoize)

(def ^{:dynamic true} *config* nil)

(defmacro with-minions [config & body]
  ;; :implicit should only start a new connection if there's none active.
  `(if (or (and *config* (:implicit ~config))
           (= *config* ~config)) ; avoid redundant nesting
     (do ~@body)
     (binding [*config* ~config]
       (wabbit/with-broker ~config
         (wabbit/with-channel ~config
           (init ~config)
           (wabbit/with-exchange (:exchange ~config)
             ~@body))))))

(defn send-off
  "Execute a form on a minion node."
  ([form] (send-off form {}))
  ([form config]
     (with-minions (merge {:implicit true} config)
       (log/trace "Published" (pr-str form) (:key config "lowly.minions.work"))
       (wabbit/publish (:key config "lowly.minions.work")
                       (.getBytes (pr-str form))))))

(defn broadcast
  "Like send-off, but the form runs on all minion nodes."
  ([form] (broadcast form {}))
  ([form config]
     (send-off form (merge {:exchange "lowly.minions.broadcast"
                            :exchange-type "fanout"
                            :key "lowly.minions.broadcast"} config))))

(defmacro future
  "Run body on a minion node and return a result upon deref."
  [& body]
  `(let [reply-queue# (format "lowly.minions.reply.%s" (UUID/randomUUID))]
     (clojure.core/future
      (with-minions (merge {:implict true} *config*)
        (wabbit/queue-declare reply-queue# false true true)
        (send-off (list `wabbit/publish reply-queue#
                        '(.getBytes (pr-str (do ~@body)))))
        (wabbit/with-queue reply-queue#
          (-> (wabbit/consuming-seq true) first :body String. read-string))))))

(defn register
  "Register var to be available for minions to run."
  [var]
  (binding [*ns* context]
    (refer (.getName (:ns (meta var))) :only [(:name (meta var))])))

(defn auto-register
  "Register all vars with :minions metadata in namespaces starting with prefix.
   This requires the namespaces, so it could cause code to be loaded. prefix
   defaults to the empty string."
  ([prefix]
     (doseq [namespace (ns/namespaces-matching prefix)]
       (try (with-out-str (require namespace))
            (doseq [[_ var] (ns-publics namespace)
                    :when (:minions (meta var))]
              (register var))
            (catch Throwable _))))
  ([] (auto-register "")))

(defn- consume [{:keys [body envelope] :as msg}]
  (binding [*ns* context,*current-message* msg]
    ;; TODO: timeouts
    (log/trace "Minion received message:" (String. body))
    (eval (read-string (String. body))))
  (wabbit/ack (:delivery-tag envelope)))

(defn work
  "Wait for work and eval it continually."
  ([config]
     (when (:auto-register config true)
       (auto-register (:auto-register config "")))
     (with-minions config
       (wabbit/with-queue (:queue config "lowly.minions.work")
         (log/trace "Consuming on" (:queue config "lowly.minions.work"))
         (doseq [msg (wabbit/consuming-seq)]
           (try (consume msg)
                (catch Exception e
                  (*exception-handler* e msg)))))))
  ([] (work {:implicit true})))

(def pid (-> (ManagementFactory/getRuntimeMXBean) .getName (.split "@") first))

(def broadcast-queue-name
  (format "lowly.minions.broadcast.%s.%s"
          (.getHostName (java.net.InetAddress/getLocalHost)) pid))

(defn work-on-broadcast
  "Wait for work on the broadcast queue and eval it continually."
  ([config]
     (work (merge {:exchange "lowly.minions.broadcast"
                   :exchange-type "fanout"
                   :queue broadcast-queue-name} config)))
  ([] (work-on-broadcast {:implicit true})))

Generated by Phil Hagelberg using scpaste at Wed Jul 20 20:13:09 2011. PDT. (original)