Skip to content

Instantly share code, notes, and snippets.

@phronmophobic
Created November 1, 2025 18:43
Show Gist options
  • Select an option

  • Save phronmophobic/486372516e844684f18a51f856bfa8c2 to your computer and use it in GitHub Desktop.

Select an option

Save phronmophobic/486372516e844684f18a51f856bfa8c2 to your computer and use it in GitHub Desktop.
;; {:deps {org.clojure/core.async {:mvn/version "1.9.808-alpha1"}}}
(defn parallel-proc
([] {:params {:n "level of parallelism"
:f "compute function"}
:ins {:in "Input channel"}
:outs {:out "Output channel"}})
([{:keys [n f]}]
(let [work-output-ch (async/chan n)
work-input-ch (async/chan n)]
(async/thread-call
(fn []
(loop []
(when-let [msg (async/<!! work-input-ch)]
(async/thread
(async/>!! work-output-ch (f msg))))
(recur)))
:io)
{::flow/in-ports {:work-output work-output-ch}
::flow/out-ports {:work-input work-input-ch}
:n n
:in-flight 0}))
([state status]
(when (= status ::flow/stop)
(async/close! (-> state ::flow/in-ports :work-output))
(async/close! (-> state ::flow/out-ports :work-input)))
state)
([state in msg]
(case in
:in
(let [state (update state :in-flight inc)
in-flight (:in-flight state)
state (if (>= in-flight (:n state))
(assoc state ::flow/input-filter
(fn [cid]
(not= cid :in)))
state)]
[state {:work-input [msg]}])
:work-output
(let [state (update state :in-flight dec)
state (if (::flow/input-filter state)
(dissoc state ::flow/input-filter)
state)]
[state {:out [msg]}]))))
(def gdef
{:procs
{:parallel
{:proc (flow/process #'parallel-proc)
:args {:n 5
:f (fn [nsecs]
(println "starting " nsecs)
(Thread/sleep (* 1000 nsecs))
(println "done nsecs" nsecs)
nsecs)}}
:prn-sink
{:proc (flow/process
(flow/map->step
{:describe (fn [] {:ins {:in "gimme stuff to print!"}})
:transform (fn [_ _ v] (prn :prn v))}))}}
:conns
[
[[:parallel :out] [:prn-sink :in]]
,]
,})
(def g (flow/create-flow gdef))
(flow/start g)
(flow/resume g)
(flow/inject g [:parallel :in] [(inc (rand-int 5))])
(dotimes [i 20]
(flow/inject g [:parallel :in] [5]))
(flow/stop g)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment