-
Book Overview & Buying
-
Table Of Contents
Clojure Data Structures and Algorithms Cookbook
By :
Time-sharing is about sharing a computing facility between multiple concurrent processes. At its very basic version, a scheduler decides , which one of these competing processes to execute at every single quantum of time. This way, even a single processor core, only capable of sequential operation, is able to spawn multiple threads, as if they were being executed in parallel.
One method of preventing race conditions, that is, multiple processes concurrently reading and writing wrong versions of the same shared place in memory, is locking. Imagine, for example, that there are two processes incrementing the same shared counter. Process 1 takes the value of the counter and overwrites it with the value + 1. If, meanwhile, process 2 does the same thing – that is, it reads the same version of the counter that process 1 reads for the very first time and overwrites it with the same value + 1 – you'd end up with the counter that will only be incremented once. Hence, locking this portion of code makes process 2 wait for process 1 until it finishes reading and writing, and only when process 1 is done and sets the lock free, process 2 will be allowed to play its part, leading to the correct final value of the counter + 2.
Managing locks can be too tedious. That's why it is often better to use high-level concurrency alternatives, such as those provided by Clojure: the software transactional memory (refs and atoms), agents, and core.async.
(ns recipe3.core
(:require [instaparse.core :as insta])
;;=> For parsing the code of our
;processes
(:require [clojure.zip :as z])
;;=> To walk the parse-trees and generate processes instructions.
(:require [clojure.pprint :refer :all]))
;;=> an Alias to easily pretty print our outputs.Instaparse (https://github.com/engelberg/instaparse) is a parser generator written in Clojure. To explain all of the mechanism behind Instaparse is beyond the scope of this book, but you should know that it handles context-free grammar (CFG) and generates parse trees of your input programs according to these grammar concepts.
clojure.pprint/pprint, so that we can make it more conveniently:(def p pprint)
instaparse will be able to interpret for us. Our language instructions for a single process are as follows:heavy-op op1; light-op op2; lock l1; medium-op op3; unlock l1;
heavy-op, which are sorted according to the effort they need in order to be fully processed by the scheduler: heavy-op, medium-op, and finally light-op. Besides, we are able to lock and unlock a portion of our programs with the lock and unlock instructions. Each one of these instructions needs you to specify an identifier, so that they can be recognized in the scheduler output.(def r3-language "S = INSTRS INSTRS = ((INSTR | LOCKED-INSTRS) <optional-whitespace>)* INSTR = HEAVY-OP | MEDIUM-OP | LIGHT-OP HEAVY-OP = <optional-whitespace> 'heavy-op' <whitespace> ID <SEP> MEDIUM-OP = <optional-whitespace> 'medium-op' <whitespace> ID <SEP> LIGHT-OP = <optional-whitespace> 'light-op' <whitespace> ID <SEP> LOCKED-INSTRS = LOCK INSTRS UNLOCK LOCK = <optional-whitespace> 'lock' <whitespace> ID <SEP> UNLOCK = <optional-whitespace> 'unlock' <whitespace> ID <SEP> ID = #'[a-zA-Z0-9]+' PRIORITY = #'[0-9]+' whitespace = #'\\s+' optional-whitespace = #'\\s*' SEP = #'\\s*' ';'")
white-space tags, for instance.(p (insta/parse (insta/parser r3-language) "heavy-op op1; light-op op2; lock l1; medium-op op3; unlock l1;")) And you'll get : [:S [:INSTRS [:INSTR [:HEAVY-OP "heavy-op" [:ID "op1"]]] [:INSTR [:LIGHT-OP "light-op" [:ID "op2"]]] [:LOCKED-INSTRS [:LOCK "lock" [:ID "l1"]] [:INSTRS [:INSTR [:MEDIUM-OP "medium-op" [:ID "op3"]]]] [:UNLOCK "unlock" [:ID "l1"]]]]]
transform to eliminate the rules tags and get a more useful representation of our instructions. transform function takes a tag and applies a function to the elements next to it in the vector that this tag refers to:(defn gen-program
[parser program]
(insta/transform
{:S identity
:INSTRS (fn [& args] (vec args))
:INSTR identity
:HEAVY-OP (fn [x y] {:inst-type :heavy-op :inst-id (get y 1)})
:MEDIUM-OP (fn [x y] {:inst-type :medium-op :inst-id (get y 1)})
:LIGHT-OP (fn [x y] {:inst-type :light-op :inst-id (get y 1)})
:LOCKED-INSTRS (fn [& args] (vec args))
:LOCK (fn [x y] {:inst-type :lock :inst-id {:lock (get y 1)} })
:UNLOCK (fn [x y] {:inst-type :unlock :inst-id {:unlock (get y 1)}})}
;;=> This map tells 'transform' how to transform elements next to ;; each tag.
(parser program)))
;; The raw parse tree emitted by Insaparse.gen-program. Input the following code in the REPL:(p (gen-program (insta/parser r3-language)
"heavy-op op1;
light-op op2;
lock l1;
medium-op op3;
unlock l1;"))[{:inst-type :heavy-op, :inst-id "op1"}
{:inst-type :light-op, :inst-id "op2"}
[{:inst-type :lock, :inst-id {:lock "l1"}}
[{:inst-type :medium-op, :inst-id "op3"}]
{:inst-type :unlock, :inst-id {:unlock "l1"}}]]process-id attribute and a priority attribute to its output:(defn fire-a-process
[grammar
program
process-id
priority]
(let [prsr (insta/parser grammar) ;;=> the parser
vec-instructions (gen-program prsr program)
;;=> the nested structure
zpr (z/vector-zip vec-instructions)]
(loop [result []
loc (-> zpr z/down)]
(if (z/end? loc)
;;=> the end of recursion, no more nodes to visit
{:process-id process-id
:instructions result
:priority priority} ;;=> We generate the process
(recur (if (map? (z/node loc))
;;=> We only append to result the elements of type 'map'
(conj result (z/node loc))
result)
;;=> else we pass result as is in the recursion
(z/next loc))))))
;=> and we recur with the next element.:process-1 that has the priority 10. Input the following in your REPL:(fire-a-process r3-language
"heavy-op op1;
light-op op2;
lock l1;
medium-op op3;
unlock l1;"
:process-1
10){:process-id :process-1,
:instructions
[{:inst-type :heavy-op, :inst-id "op1"}
{:inst-type :light-op, :inst-id "op2"}
{:inst-type :lock, :inst-id {:lock "l1"}}
;;=> note that ':inst-id' of locks are {':lock' or ':unlock' id}, ;; so a locking and an un-locking instructions are not mistaken
;; one for another.
{:inst-type :medium-op, :inst-id "op3"}
{:inst-type :unlock, :inst-id {:unlock "l1"}}],
:priority 10}(def insts-effort {:heavy-op 10 :medium-op 5 :light-op 2 :lock 1
:unlock 1})(defn all-locks-indices [instructions]
;;=> 'instructions' is the ':instructions vector' of the output of ;; fire-process.
(let [locks (filter #(= (:inst-type %) :lock)
instructions)
;;=> We find out all the 'locks' in 'instructions'.
lock-indices (map (fn [l] {:lock-id (l :inst-id)
:lock-idx (.indexOf instructions l)})
locks)]
;; And for every lock we find out its index in 'instructions,
;; and prepare a map with it.
lock-indices))
;;=> output of this is : ({:lock-id {:lock "l1"}, :lock-idx 2})(defn the-locks-inst-depends-on
[instructions instruction]
(let [the-inst-idx (.indexOf instructions instruction)
the-lock-idxs (all-locks-indices instructions)]
(into [] (->> the-lock-idxs
(filter #(> the-inst-idx (:lock-idx %) ))
(map :lock-id)))))lock and un-lock functions to do this:(defn lock
"locks lock lock-id in locks map"
[locks process-id lock-id]
(assoc locks lock-id {:locker process-id :locked true}))
(defn unlock
"unlocks lock lock-id in locks map"
[locks process-id lock-id]
(assoc locks lock-id {:locker process-id :locked false}))
;;=> The locks state contains its locked state and which process ;; did lock it.is-locked? function relies on this mechanism to inform whether an instruction is locked at any point in time, so it cannot be fired by the scheduler:(defn is-locked?
[process-id
instructions
locks
instruction]
(let [inst-locks (the-locks-inst-depends-on instructions instruction)]
(some true? (map #(and (not= process-id ((get locks %) :locker))
((get locks %) :locked))
inst-locks))))
;;=> If some of the locks the instruction depend on are locked (:locked true)
;; and the locker is not its process, then it is considered as
;; locked.scheduled. Let's say that this map should look like the following:[{:process-id :process-1
:instructions
[{:times [1 2 3], :inst-id "op1", :inst-type :heavy-op}
{:times [4 5 6], :inst-id "op2", :inst-type :medium-op}]}{:process-id :process-id :process-2
:instructions
[{:times [7 8], :inst-id "op1", :inst-type :heavy-op}
{:times [9 10], :inst-id "op2", :inst-type :medium-op}]}]
;;=> ':times' contain vectors of the time quantums allocated to
;; the instruction.scheduled-processes-parts, that'll count the number of quanta already allocated, and this will be handy in knowing whether an instruction is complete:(defn scheduled-processes-parts
[scheduled]
(into [] (map (fn [p] {:process-id (:process-id p)
:instructions (into []
(map (fn [i] {:inst-id (:inst-id i)
:inst-type (:inst-type i)
:count (count (:times i))})
(:instructions p)))})
scheduled)))
;;=> this functions just adds :count n to the map maintained in ;;"scheduled" incomplete-instruction?, incomplete-process?, and more-incomplete-processes? that we'll use later on:(defn incomplete-instruction?
[instruction-w-count]
(let [instr-effort (insts-effort (instruction-w-count :inst-type))
instr-count (instruction-w-count :count)]
(< instr-count instr-effort)))
(defn incomplete-process?
[process-w-counts]
(let [instrs-w-count (process-w-counts :instructions)]
(some true? (map incomplete-instruction?
instrs-w-count))))
(defn more-incomplete-processes?
[processes-w-count]
(some true? (map incomplete-process?
processes-w-count)))
;=> processes-w-count is just another name for the "scheduled"
;; state map.locked by another process:(defn find-inst-to-be-fired-in-process
[locks
process-id
the-process-instructions
the-process-scheduled-parts]
(let [p-not-locked-instrs (set (->> the-process-instructions
(filter #(not (is-locked? process-id
the-process-instructions
locks
%)))))
;;=> A set of not locked instructions
p-incomplete-instrs (set (->> (:instructions the-process-scheduled-parts)
(filter incomplete-instruction?)
(map #(dissoc % :count))))
;;=> A set of incomplete instructions
fireable-instrs (clojure.set/intersection p-not-locked-instrs
p-incomplete-instrs)
;;=> Their intersection
instr-id-to-fire (->> fireable-instrs
(sort-by #(.indexOf the-process-instructions %) < )
(first)
(:inst-id))]
;;=> The first on of them
instr-id-to-fire))progress-on-process!, which considers one particular process, fires its fireable instruction — as found by the preceding function, and updates all locks and scheduled states. This is quite a long function, as it is the heart of the scheduler:(defn progress-on-process!
[locks-ref
scheduled-ref
the-process
quantum]
(let [the-process-instrs (the-process :instructions)
processes-scheduled-parts (scheduled-processes-parts @scheduled-ref)
the-process-scheduled-parts (->> processes-scheduled-parts
(filter #(= (:process-id %)
(:process-id the-process)))
(first))]
;;=> Here we prepare the processes scheduled parts and take only
;; the relevant to the particular 'process-id'.
(if-let [the-instr-to-fire-id (find-inst-to-be-fired-in-process @locks-ref
(:process-id the-process)
the-process-instrs
the-process-scheduled-parts )]
;;=> If there is one instruction in "process-id" to be fired;
(dosync
;;=> We use the refs, because we need to do transactions involving
;; both "scheduled" and "locks"
(let [the-instr-to-fire (->> the-process-instrs
(filter #(= (:inst-id %)
the-instr-to-fire-id))
(first))]
;;=> We get the entry relevant to this instruction-id
(cond
(= (:inst-type the-instr-to-fire) :lock ) (alter locks-ref
lock
(:process-id the-process)
the-instr-to-fire-id)
(= (:inst-type the-instr-to-fire) :unlock ) (alter locks-ref
unlock
(:process-id the-process)
{:lock
(:unlock the-instr-to-fire-id)})))
;;=> If it is a "lock" or "unlock", We update the "locks" state
;; map
(let [p-in-scheduled (->> @scheduled-ref
(filter #(= (:process-id %)
(:process-id the-process)))
(first))
;;=> To update the "scheduled" ref, we begin by finding the
;; ':process-d' in the processes vector
instr-in-p-in-scheduled (->> (get p-in-scheduled :instructions)
(filter #(= (:inst-id %) the-instr-to-fire-id))
(first))
;; Then We find the instruction in this process
idx-p-in-scheduled (max 0 (.indexOf @scheduled-ref
p-in-scheduled))
idx-inst-in-p-in-scheduled (max 0
(.indexOf (get p-in-scheduled :instructions)
instr-in-p-in-scheduled))
;;=> We compute the index of the instruction; or we set it at 0
;; if it is not found, which means it is the first time it is
;; scheduled.
times-in-inst-in-p-in-scheduled (get
(get (p-in-scheduled :instructions)
idx-inst-in-p-in-scheduled) :times )
;;=> We get the times vector in "scheduled" related to this
;; instruction
_ (alter scheduled-ref assoc-in [idx-p-in-scheduled :instructions idx-inst-in-p-in-scheduled :times]
(conj times-in-inst-in-p-in-scheduled quantum))])
;;=> And using assoc-in, with indices and keys as a "path
;; vector", we Update the "scheduled" ref with times vector
;; to which we Append the current "quantum".
true)
;;=> If we were able to find a fireable instruction,
;; we issue "true".
false)))
;; => Else we issue "false".locks and scheduled maps, which are to be used by progress-on-process!:(defn prepare-scheduled
[processes]
(into [] (->> processes
(map (fn[p] {:process-id (:process-id p)
:instructions (into []
(->> (:instructions p)
(map (fn [i] (assoc i
:times [])))))})))))
;;=> We prepare "scheduled" as being the same thing as the
;; "processes" map
;; with empty ":times" vectors added.
(defn prepare-locks-for-a-p
[a-process]
(let [locks (filter #(= (:inst-type %) :lock )
(:instructions a-process))]
(reduce (partial apply unlock) {} (map (fn [l] [(:process-id a-process)
(:inst-id l)])
locks))))
;;=> A helper function that will prepare "locks" set to false for
;; instructions related to a process"
(defn prepare-locks
[processes]
(reduce merge (map prepare-locks-for-a-p processes)))
;;=> Applying "prepare-locks-for-a-p", we generate locks for all
;; processes that would run concurrently.p1 with priority 3, process p2 with priority 2, and process p3 with priority 1, then a sequence presenting the cycling that we described previously would be:[p1 p1 p1 p2 p2 p3 p1 p1 p1 p2 p2 p3....]
(defn gen-processes-cycles
[processes]
(let [sorted-procs-by-prio (sort-by :priority > processes)
procs-pattern (mapcat #(repeat (:priority %)
%)
sorted-procs-by-prio)]
;;=> A pattern is a single repetition "priority" times of each
;; process
(cycle procs-pattern)))
;;=> Generates an infinite sequence like we described above.(defn process-sequential-time
[a-process]
(let [instructions (a-process :instructions)
inst-types (map :inst-type instructions)
lengths (map #(get insts-effort %) inst-types)]
(reduce + lengths)))
;;=> We get instruction-types, grab the efforts from the "insts-
;; effort"
;; map and sum them all up using reduce.progress-on-a-process! on it. Note that we launch this on several programs as we are implementing time-sharing to do multithreading:(defn schedule-programs
[language programs]
;;=> programs are maps : {:program "the textual program",
;; :process-id the-process-id
;; :priority the-process-priority }
(let [processes (into [] (map #(fire-a-process language
(:program %)
(:process-id %)
(:priority %)) programs))
;;=> Processes are constructed
timeout (* 2 (reduce + (map process-sequential-time
processes)))
;;=> "timeout" is the total length of all processes executed one ;; after the other.
locks (ref (prepare-locks processes))
scheduled (ref (prepare-scheduled processes))
processes-cycles (gen-processes-cycles processes)]
;;=> We prepare "locks" and "scheduled" refs, and the weighted
;; process repetitions that the scheduler will have to cycle
;; through
(loop [quantum 0
remaining-processes processes-cycles]
;;=> We loop
(if (and (more-incomplete-processes? (scheduled-processes-parts @scheduled))
(< quantum timeout))
(do
(progress-on-process! locks scheduled
(first remaining-processes)
quantum)
;;=> progress on the selected process, with current "quantum"
(recur (inc quantum)
(next remaining-processes)))
;;=> Go to next iteration, incrementing quantum and cycling
;;=> through the The weighted processes cycles.
@scheduled)))Now, let's define two random programs and see how they perform. First, define them in your REPL:
(def programs [{:priority 3, :program "heavy-op op1;light-op op2;lock l1;medium-op op3;unlock l1;", :process-id :pr1} {:priority 1, :program "lock l1;medium-op op4;unlock l1;medium-op op5;", :process-id :pr2}])
Now, launch schedule-programs:
(p (schedule-programs r3-language programs))
By launching it, you'll get the following output:
[{:process-id :pr1, :instructions [{:times [0 1 2 4 5 6 8 9 10 12], :inst-type :heavy-op, :inst-id "op1"} {:times [13 14], :inst-type :light-op, :inst-id "op2"} {:times [16], :inst-type :lock, :inst-id {:lock "l1"}} {:times [17 18 20 21 22], :inst-type :medium-op, :inst-id "op3"} {:times [24], :inst-type :unlock, :inst-id {:unlock "l1"}}]} {:process-id :pr2, :instructions [{:times [3], :inst-type :lock, :inst-id {:lock "l1"}} {:times [7 11 15 27 31], :inst-type :medium-op, :inst-id "op4"} {:times [35], :inst-type :unlock, :inst-id {:unlock "l1"}} {:times [39 43 47 51 55], :inst-type :medium-op, :inst-id "op5"}]}]
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
Change the font size
Change margin width
Change background colour