Book Image

Clojure Data Structures and Algorithms Cookbook

By : Rafik Naccache
Book Image

Clojure Data Structures and Algorithms Cookbook

By: Rafik Naccache

Overview of this book

Table of Contents (14 chapters)
Clojure Data Structures and Algorithms Cookbook
Credits
About the Author
About the Reviewers
www.PacktPub.com
Preface
Index

Simulating multithreading using time-sharing


Time-sharing is about sharing a computing facility between multiple concurrent processes. At its very basic version, a scheduler decides , which one of these competing processes to execute at every single quantum of time. This way, even a single processor core, only capable of sequential operation, is able to spawn multiple threads, as if they were being executed in parallel.

One method of preventing race conditions, that is, multiple processes concurrently reading and writing wrong versions of the same shared place in memory, is locking. Imagine, for example, that there are two processes incrementing the same shared counter. Process 1 takes the value of the counter and overwrites it with the value + 1. If, meanwhile, process 2 does the same thing – that is, it reads the same version of the counter that process 1 reads for the very first time and overwrites it with the same value + 1 – you'd end up with the counter that will only be incremented once. Hence, locking this portion of code makes process 2 wait for process 1 until it finishes reading and writing, and only when process 1 is done and sets the lock free, process 2 will be allowed to play its part, leading to the correct final value of the counter + 2.

Note

Managing locks can be too tedious. That's why it is often better to use high-level concurrency alternatives, such as those provided by Clojure: the software transactional memory (refs and atoms), agents, and core.async.

How to do it...

  1. First of all, we'll begin importing some libraries that we will use:

    (ns recipe3.core
      (:require [instaparse.core :as insta]) 
    ;;=> For parsing the code of our 
                    ;processes
      (:require [clojure.zip :as z])               
    ;;=> To walk the parse-trees and generate processes instructions.
      (:require [clojure.pprint :refer :all]))  
    ;;=> an Alias to easily pretty print our outputs.

    Note

    Instaparse (https://github.com/engelberg/instaparse) is a parser generator written in Clojure. To explain all of the mechanism behind Instaparse is beyond the scope of this book, but you should know that it handles context-free grammar (CFG) and generates parse trees of your input programs according to these grammar concepts.

  2. To be able to pretty-print our output in the REPL, let's define an alias for clojure.pprint/pprint, so that we can make it more conveniently:

    (def p pprint)
  3. As we'll be spawning processes with instructions of their own, let's define a minimal language that instaparse will be able to interpret for us. Our language instructions for a single process are as follows:

    heavy-op op1;
    light-op op2;
    lock l1;
    medium-op op3;
    unlock l1;
  4. The previous snippet is self-explanatory. Our language only contains three types of operations: heavy-op, which are sorted according to the effort they need in order to be fully processed by the scheduler: heavy-op, medium-op, and finally light-op. Besides, we are able to lock and unlock a portion of our programs with the lock and unlock instructions. Each one of these instructions needs you to specify an identifier, so that they can be recognized in the scheduler output.

  5. The grammar for such a language is:

    (def r3-language
    "S = INSTRS
      INSTRS = ((INSTR | LOCKED-INSTRS) <optional-whitespace>)*
      INSTR = HEAVY-OP | MEDIUM-OP | LIGHT-OP
      HEAVY-OP = <optional-whitespace> 'heavy-op' <whitespace> ID <SEP>
      MEDIUM-OP = <optional-whitespace> 'medium-op' <whitespace> ID <SEP>
      LIGHT-OP = <optional-whitespace> 'light-op' <whitespace> ID <SEP>
    
      LOCKED-INSTRS = LOCK INSTRS UNLOCK
      LOCK = <optional-whitespace> 'lock' <whitespace> ID <SEP>
      UNLOCK = <optional-whitespace> 'unlock' <whitespace> ID <SEP>
    
      ID = #'[a-zA-Z0-9]+'
      PRIORITY = #'[0-9]+'
    
      whitespace = #'\\s+'
      optional-whitespace = #'\\s*'
      SEP = #'\\s*' ';'")
  6. Note that identifiers between angle brackets will not be seen in the parse tree, so there's no use referring to the white-space tags, for instance.

  7. Let's see what would be the Instaparse output for the program we wrote in the preceding code. For this, just type the following in your REPL:

    (p  (insta/parse (insta/parser r3-language) 
    "heavy-op op1;
    light-op op2;
    lock l1;
    medium-op op3;
    unlock l1;"))
    And you'll get :
    [:S  
     [:INSTRS
      [:INSTR [:HEAVY-OP "heavy-op" [:ID "op1"]]]
      [:INSTR [:LIGHT-OP "light-op" [:ID "op2"]]]
      [:LOCKED-INSTRS
       [:LOCK "lock" [:ID "l1"]]
       [:INSTRS [:INSTR [:MEDIUM-OP "medium-op" [:ID "op3"]]]]
       [:UNLOCK "unlock" [:ID "l1"]]]]]
  8. We need to transform these nested vectors in to instructions. First of all, we will make use of the very handy instaparse function transform to eliminate the rules tags and get a more useful representation of our instructions. transform function takes a tag and applies a function to the elements next to it in the vector that this tag refers to:

    (defn gen-program
      [parser program]
      (insta/transform
       {:S identity
        :INSTRS (fn [& args] (vec args))
        :INSTR identity
        :HEAVY-OP (fn [x y] {:inst-type :heavy-op :inst-id (get y 1)})
        :MEDIUM-OP (fn [x y] {:inst-type :medium-op :inst-id (get y 1)})
        :LIGHT-OP (fn [x y] {:inst-type :light-op :inst-id (get y 1)})
        :LOCKED-INSTRS (fn [& args] (vec args))
        :LOCK (fn [x y] {:inst-type :lock :inst-id {:lock (get y 1)} })
        :UNLOCK (fn [x y] {:inst-type :unlock :inst-id {:unlock  (get y 1)}})}
    ;;=> This map tells 'transform' how to transform elements next to ;; each tag.
       (parser program))) 
    ;; The raw parse tree emitted by Insaparse.
  9. Here is the output of gen-program. Input the following code in the REPL:

    (p (gen-program (insta/parser r3-language)  
                               "heavy-op op1;
            light-op op2;
            lock l1;
            medium-op op3;
            unlock l1;"))
  10. You'll get the following output:

    [{:inst-type :heavy-op, :inst-id "op1"}
     {:inst-type :light-op, :inst-id "op2"}
     [{:inst-type :lock, :inst-id {:lock "l1"}}
      [{:inst-type :medium-op, :inst-id "op3"}]
      {:inst-type :unlock, :inst-id {:unlock "l1"}}]]
  11. To get rid of the nesting that we still see here, we are going to use a zipper, which is a Clojure facility to walk trees. Basically, we will loop all the nested vector elements and only take maps, so that we end up with a nice, flat program structure. As this will be our actual process, we'll also append a process-id attribute and a priority attribute to its output:

    (defn fire-a-process
      [grammar
       program
       process-id
       priority]
      (let [prsr (insta/parser grammar)       ;;=> the parser
            vec-instructions (gen-program prsr program)  
    ;;=> the nested structure
            zpr (z/vector-zip vec-instructions)]
        (loop [result []
               loc (->  zpr z/down)]
          (if (z/end? loc)           
    ;;=> the end of recursion, no more nodes to visit
            {:process-id process-id
             :instructions result
             :priority priority}        ;;=> We generate the process
            (recur (if (map? (z/node loc))      
    ;;=> We only append to result the elements of type 'map'
                     (conj result (z/node loc))
                     result)          
    ;;=> else we pass result as is in the recursion
                   (z/next loc))))))
    ;=> and we recur with the next element.
  12. Here is a process spawned by our program named :process-1 that has the priority 10. Input the following in your REPL:

    (fire-a-process r3-language 
                                  "heavy-op op1;
               light-op op2;
               lock l1;
               medium-op op3;
               unlock l1;"
                                  :process-1
                                  10)
  13. You'll get the following output:

    {:process-id :process-1,
     :instructions
     [{:inst-type :heavy-op, :inst-id "op1"}
      {:inst-type :light-op, :inst-id "op2"}
      {:inst-type :lock, :inst-id {:lock "l1"}} 
    ;;=> note that ':inst-id' of locks are {':lock' or ':unlock' id}, ;; so a locking and an un-locking instructions are not mistaken 
    ;; one for another.
      {:inst-type :medium-op, :inst-id "op3"}
      {:inst-type :unlock, :inst-id {:unlock "l1"}}],
     :priority 10}
  14. Now, we need to set effort for each of our instructions, that is, how many processor cycles each one of them takes to be executed:

    (def insts-effort {:heavy-op 10 :medium-op 5 :light-op 2 :lock 1 
    :unlock 1})
  15. Now we'll concern ourselves with locking. First of all, we need to find the indices of locking instructions in our instructions vector:

    (defn all-locks-indices  [instructions]
    ;;=> 'instructions' is the ':instructions vector' of the output of ;; fire-process.     
      
      (let [locks (filter #(= (:inst-type %) :lock)
                          instructions)
    ;;=> We find out all the 'locks' in 'instructions'.  
            lock-indices (map (fn [l] {:lock-id (l :inst-id)
                                       :lock-idx (.indexOf instructions l)})
                              locks)]
    ;; And for every lock we find out its index in 'instructions, 
    ;; and prepare a map with it.
        lock-indices))
    ;;=> output of this is : ({:lock-id {:lock "l1"}, :lock-idx 2})
  16. With our locks recognized, we can tell which lock every instruction depends on. This is basically done by finding out which locks have indices inferior to the instruction index:

    (defn the-locks-inst-depends-on
      [instructions instruction]
      (let [the-inst-idx (.indexOf instructions instruction)
            the-lock-idxs (all-locks-indices instructions)]    
        (into []  (->> the-lock-idxs
                       (filter #(> the-inst-idx (:lock-idx %) ))
                       (map :lock-id)))))
  17. We'll need a map that maintains the state of locks so the scheduler can track the locking and unlocking activities during the program execution with. We'll define lock and un-lock functions to do this:

    (defn lock  
      "locks lock lock-id in locks map"
      [locks process-id lock-id]
      (assoc locks lock-id {:locker process-id :locked true}))
    (defn unlock
      "unlocks lock lock-id in locks map"
      [locks process-id lock-id]
      (assoc locks lock-id {:locker process-id :locked false}))
    ;;=> The locks state contains its locked state and which process ;; did lock it.
  18. The locker process information, manipulated in the previous step is important. As some process' instruction can only be denied access to a shared resource by locks set by other processes contains, we need to track which is locking what. The is-locked? function relies on this mechanism to inform whether an instruction is locked at any point in time, so it cannot be fired by the scheduler:

    (defn is-locked?
      [process-id
       instructions
       locks  
       instruction]
      (let [inst-locks (the-locks-inst-depends-on instructions instruction)]
        (some true? (map #(and (not= process-id ((get locks %) :locker))
                               ((get locks %) :locked))
                         inst-locks))))
    ;;=> If some of the locks the instruction depend on are locked (:locked true)
    ;; and the locker is not its process, then it is considered as 
    ;; locked.
  19. Let's focus on the scheduler now. Imagine that some parts of a process have already been assigned some quanta of time. We need a map to maintain a state for all the processes regarding the parts that already have been processed so far. We'll call this map scheduled. Let's say that this map should look like the following:

    [{:process-id :process-1 
      :instructions
      [{:times [1 2 3], :inst-id "op1", :inst-type :heavy-op}
       {:times [4 5 6], :inst-id "op2", :inst-type :medium-op}]}{:process-id :process-id :process-2 
      :instructions
      [{:times [7 8], :inst-id "op1", :inst-type :heavy-op}
       {:times [9 10], :inst-id "op2", :inst-type :medium-op}]}] 
    ;;=> ':times' contain vectors of the time quantums allocated to 
    ;; the instruction.
  20. We'll prepare a helper function, scheduled-processes-parts, that'll count the number of quanta already allocated, and this will be handy in knowing whether an instruction is complete:

    (defn scheduled-processes-parts
      [scheduled]
      (into [] (map  (fn [p] {:process-id (:process-id p)
                              :instructions (into []
    (map (fn [i] {:inst-id (:inst-id i)
    :inst-type (:inst-type i)
    :count (count (:times i))})
    (:instructions   p)))})
                     scheduled)))
    ;;=> this functions just adds :count n to the map maintained in ;;"scheduled" 
  21. We'll use this function to implement incomplete-instruction?, incomplete-process?, and more-incomplete-processes? that we'll use later on:

    (defn incomplete-instruction?
      [instruction-w-count]
    (let [instr-effort (insts-effort (instruction-w-count :inst-type))
            instr-count (instruction-w-count :count)]
        (< instr-count instr-effort)))
    (defn incomplete-process?
      [process-w-counts]
      (let [instrs-w-count (process-w-counts :instructions)]
        (some true? (map incomplete-instruction?
                          instrs-w-count))))
    (defn more-incomplete-processes?
      [processes-w-count]
      (some true? (map incomplete-process?
                       processes-w-count)))
    ;=> processes-w-count is just another name for the "scheduled" 
    ;;  state map.
  22. Diving deeper into the implementation, let's now look at a single process and define a function that finds which instruction is to be fired if the scheduler decides to allocate a quantum to it. This translates to the first incomplete instruction if it is non-locked, that is, none of its locks have been set to locked by another process:

    (defn find-inst-to-be-fired-in-process
      [locks
       process-id
       the-process-instructions
       the-process-scheduled-parts]
      (let [p-not-locked-instrs (set (->> the-process-instructions
    (filter #(not (is-locked? process-id
                                                                    the-process-instructions
    locks
     %))))) 
    ;;=> A set of not locked instructions
    p-incomplete-instrs (set (->> (:instructions  the-process-scheduled-parts) 
    (filter incomplete-instruction?)
                                          (map #(dissoc % :count))))
    ;;=> A set of incomplete instructions
    fireable-instrs (clojure.set/intersection p-not-locked-instrs
    p-incomplete-instrs)
    ;;=> Their intersection
            instr-id-to-fire (->> fireable-instrs 
    (sort-by #(.indexOf the-process-instructions %) < ) 
                                  (first)
                               (:inst-id))]
    ;;=> The first on of them
            instr-id-to-fire))
  23. Now, let's write progress-on-process!, which considers one particular process, fires its fireable instruction — as found by the preceding function, and updates all locks and scheduled states. This is quite a long function, as it is the heart of the scheduler:

    (defn progress-on-process!
      [locks-ref
       scheduled-ref
       the-process
       quantum]
      (let [the-process-instrs (the-process :instructions)
            processes-scheduled-parts (scheduled-processes-parts @scheduled-ref)
            the-process-scheduled-parts (->> processes-scheduled-parts
    (filter #(= (:process-id %)
    (:process-id the-process)))
                                             (first))]
    ;;=> Here we prepare the processes scheduled parts and take only
      ;; the relevant to the particular 'process-id'.
        (if-let [the-instr-to-fire-id (find-inst-to-be-fired-in-process @locks-ref
    (:process-id the-process)
    the-process-instrs
    the-process-scheduled-parts )]
    ;;=> If there is one instruction in "process-id" to be fired;
          (dosync 
    ;;=> We use the refs, because we need to do transactions involving 
      ;; both "scheduled" and "locks"
           (let [the-instr-to-fire (->> the-process-instrs
                                        (filter #(= (:inst-id %)
    the-instr-to-fire-id))
                                         (first))]
    ;;=> We get the entry relevant to this instruction-id
             (cond          
            (= (:inst-type the-instr-to-fire) :lock ) (alter locks-ref
                                                          lock
    (:process-id the-process)
    the-instr-to-fire-id)
               (= (:inst-type the-instr-to-fire) :unlock ) (alter locks-ref
                                                            unlock 
    (:process-id the-process)
                                                             {:lock 
    (:unlock the-instr-to-fire-id)})))
    ;;=> If it is a "lock" or "unlock", We update the "locks" state 
    ;;   map
           (let [p-in-scheduled (->> @scheduled-ref
                                     (filter #(= (:process-id %) 
    (:process-id the-process)))
                                                 (first)) 
       ;;=> To update the "scheduled" ref, we begin by finding the 
       ;; ':process-d' in the processes vector
                 instr-in-p-in-scheduled (->> (get p-in-scheduled :instructions)
                                              (filter #(= (:inst-id %) the-instr-to-fire-id))
                                              (first))
           ;; Then We find the instruction in this process
                 idx-p-in-scheduled (max 0 (.indexOf @scheduled-ref 
    p-in-scheduled))
                 idx-inst-in-p-in-scheduled (max 0 
    (.indexOf  (get p-in-scheduled :instructions)
    instr-in-p-in-scheduled))
    ;;=> We compute the index of the instruction; or we set it at 0 
      ;; if it is not found, which means it is the first time it is
      ;; scheduled. 
                   times-in-inst-in-p-in-scheduled (get 
                                                  (get (p-in-scheduled :instructions)
                                                  
    idx-inst-in-p-in-scheduled) :times )
    ;;=> We get the times vector in "scheduled" related to this
                 ;; instruction 
                 _ (alter scheduled-ref assoc-in [idx-p-in-scheduled :instructions idx-inst-in-p-in-scheduled :times]
                          (conj times-in-inst-in-p-in-scheduled quantum))])
    ;;=> And using assoc-in, with indices and keys as a "path 
    ;;   vector", we Update the "scheduled" ref with times vector 
    ;;   to which we  Append the current "quantum".
           true)       
    ;;=> If we were able to find a fireable instruction,            
    ;;   we issue "true".
          false)))    
    ;; => Else we issue "false".
  24. The following functions will help us prepare empty locks and scheduled maps, which are to be used by progress-on-process!:

    (defn prepare-scheduled
      [processes]  
      (into []  (->> processes
                     (map (fn[p] {:process-id (:process-id p)
                                  :instructions (into []
    (->> (:instructions p)
    (map (fn [i] (assoc i 
    :times [])))))})))))
    ;;=> We prepare "scheduled" as being the same thing as the 
    ;;   "processes" map
    ;;   with empty ":times" vectors added.
    (defn prepare-locks-for-a-p
      [a-process]
      (let [locks (filter #(= (:inst-type %) :lock )
                          (:instructions a-process))]
        (reduce (partial apply unlock) {} (map (fn [l] [(:process-id a-process)
     (:inst-id l)])
                                               locks))))
    ;;=> A helper function that will prepare "locks" set to false for
    ;;   instructions related to a process"
    (defn prepare-locks
      [processes]
      (reduce merge (map prepare-locks-for-a-p processes)))
    ;;=> Applying "prepare-locks-for-a-p", we generate locks for all 
    ;;   processes  that would run concurrently.
  25. Equipped with all these functions, we must address the problem of process selection for the allocation of each quantum of time. We must give each process an opportunity to access the scheduler quanta according to its priority. For this purpose, we will construct an infinite sequence of holding repetitions of a process ID as many times as their priority values. In this, a process with higher priority will always come before another with lower priority. Suppose we have process p1 with priority 3, process p2 with priority 2, and process p3 with priority 1, then a sequence presenting the cycling that we described previously would be:

    [p1 p1 p1 p2 p2 p3 p1 p1 p1 p2 p2 p3....]
  26. As the time quantums flow, the scheduler will have to pick at each step an element, cycling through the weighted cycling list, which we just saw, to be sure it is fair toward the process's priority.

  27. The following functions create the priority-weighted cycling process IDs:

    (defn gen-processes-cycles
      [processes]  
      (let [sorted-procs-by-prio (sort-by :priority > processes)
            procs-pattern (mapcat #(repeat (:priority %)
                                           %)
                                  sorted-procs-by-prio)]
    ;;=> A pattern is a single repetition "priority" times of each 
    ;;   process
        (cycle procs-pattern)))
    ;;=> Generates an infinite sequence like we described above.
  28. Locking programs may lead to infinite waiting. To tackle this problem, we will set a time-out for our scheduler, which will be twice the time needed by all the processes if they were to be executed sequentially, one after the other. This function does just that:

    (defn process-sequential-time
      [a-process]
      (let [instructions (a-process :instructions)
            inst-types (map :inst-type instructions)
            lengths (map #(get insts-effort %) inst-types)]
        (reduce + lengths)))
    ;;=> We get instruction-types, grab the efforts from the "insts-
    ;;   effort" 
    ;;   map and sum them all up using reduce.
  29. Finally, we can write our scheduler. While there are incomplete processes left to be scheduled and before the current quantum reaches time-out, the scheduler will cycle the weighted processes cycles, pick one process, and call progress-on-a-process! on it. Note that we launch this on several programs as we are implementing time-sharing to do multithreading:

    (defn schedule-programs
      [language programs] 
    ;;=> programs are maps : {:program "the textual program", 
    ;;  :process-id the-process-id
    ;;  :priority the-process-priority }
       (let [processes (into [] (map #(fire-a-process language
                                                      (:program %)
                                                      (:process-id %)
                                                      (:priority %)) programs))
    ;;=> Processes are constructed 
             timeout (* 2 (reduce + (map process-sequential-time
                                         processes)))
    ;;=> "timeout" is the total length of all processes executed one ;;   after the other.
             locks (ref (prepare-locks processes))
             scheduled (ref (prepare-scheduled processes))
             processes-cycles  (gen-processes-cycles processes)]
    ;;=> We prepare "locks" and "scheduled" refs, and the weighted
    ;;   process repetitions that the scheduler will have to cycle 
    ;;   through     
    (loop [quantum 0
                remaining-processes processes-cycles]
    ;;=> We loop  
           (if (and (more-incomplete-processes? (scheduled-processes-parts @scheduled))
                    (< quantum timeout))
             (do
               (progress-on-process! locks scheduled
                                     (first remaining-processes)
                                     quantum)  
         ;;=> progress on the selected process, with current "quantum"
               (recur (inc quantum)
                      (next remaining-processes)))
          ;;=> Go to next iteration, incrementing quantum and cycling 
          ;;=> through the The weighted processes cycles.
             @scheduled)))

Now, let's define two random programs and see how they perform. First, define them in your REPL:

(def programs
[{:priority 3,
  :program
  "heavy-op op1;light-op op2;lock l1;medium-op op3;unlock l1;",
  :process-id :pr1}
 {:priority 1,
  :program "lock l1;medium-op op4;unlock l1;medium-op op5;",
  :process-id :pr2}])

Now, launch schedule-programs:

(p  (schedule-programs r3-language programs))

By launching it, you'll get the following output:

[{:process-id :pr1,
  :instructions
  [{:times [0 1 2 4 5 6 8 9 10 12],
    :inst-type :heavy-op,
    :inst-id "op1"}
   {:times [13 14], :inst-type :light-op, :inst-id "op2"}
   {:times [16], :inst-type :lock, :inst-id {:lock "l1"}}
   {:times [17 18 20 21 22], :inst-type :medium-op, :inst-id "op3"}
   {:times [24], :inst-type :unlock, :inst-id {:unlock "l1"}}]}
 {:process-id :pr2,
  :instructions
  [{:times [3], :inst-type :lock, :inst-id {:lock "l1"}}
   {:times [7 11 15 27 31], :inst-type :medium-op, :inst-id "op4"}
   {:times [35], :inst-type :unlock, :inst-id {:unlock "l1"}}
   {:times [39 43 47 51 55], :inst-type :medium-op, :inst-id "op5"}]}]

Tip

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.