2010-02-21

Braun Trees

I had a couple of interesting comments on my post about heaps and priority queues. The first is by Jérôme and is an indictment against my wrong, naïve use of Obj.magic to fake initializing extensible Vecs in the presence of double-word unboxed float arrays. There are a number of avenues to repair the defect:

  1. Use a functorial interface and monomorphize the type of vectors
  2. Use an exemplar of the intended type to initialize the backing array
  3. Specialize the code for float arrays
  4. Use an explicit box to store values, most probably an option

A second reader, Damien Guichard suggested sidestepping the problem entirely and using a dynamic data structure, Braun trees as a flexible array. There aren't many online references about the data structure, the most notable being Okasaki's "Three Algorithms on Braun Trees" (Functional Pearl), but Damien provided code that served as a starting point. A Braun tree is a complete binary tree with the property that for any given node the left subchild has exactly the same size of the right subchild or one more element than it. In other words, a Braun tree of size n + 1 is a node and two children l and r, both Braun trees, such that |l| = ⌈n/2⌉ and |r| = ⌊n/2⌋, which makes |r| ≤ |l| ≤ |r| + 1. As Okasaki remarks, Braun trees have always minimum height, and their shape is determined only by the number of nodes in them.

There is a choice between an imperative implementation and a purely functional one. Damien's code in the comment is imperative, and I found that rewriting it to introduce tail-recursion was tedious and error-prone; Okasaki's "exceptionally simple and elegant" algorithms devolve into a tangle of special cases. I opted by a purely functional implementation upon which to build an imperative, mutable interface compatible with the previous version of the priority queue. The important thing to consider is to maintain in every case the invariant for the trees. I'll use a different signature than before, one specific to this data structure:

module Braun : sig
  type 'a t
  val empty     : 'a t
  val singleton : 'a -> 'a t
  val is_empty  : 'a t -> bool
  val min       : 'a t -> 'a
  val get       : 'a t -> int -> 'a
  val size      : 'a t -> int
  val rep       : ('a -> 'a -> int) -> 'a -> 'a t -> 'a t
  val ins       : ('a -> 'a -> int) -> 'a -> 'a t -> 'a t
  val del       : ('a -> 'a -> int) -> 'a t -> 'a t
end = struct (* … *)

Note that rep, ins and del take a comparison function to maintain the heap property, namely, that the root of the tree is less than the elements in either children. This is unsafe as nothing precludes passing different comparison functions to manipulate the tree; this is why I will wrap it in the final Heap signature. That said, the constructor for trees is a straightforward binary tree ADT:

type 'a t = E | N of 'a * 'a t * 'a t

Construction, testing for emptiness and returning the minimum are trivial:

let empty       = E
and singleton x = N (x, E, E)

let is_empty = function E -> true | N _ -> false

let min = function
| N (e, _, _) -> e
| E           -> failwith "empty heap"

To calculate the size of a Braun tree I follow Okasaki verbatim:

let rec diff h n = match h with
| E           when n = 0 -> 0
| E                      -> assert false
| N (_, _, _) when n = 0 -> 1
| N (_, l, r)            ->
  if n mod 2 = 1 then diff l ((n - 1) / 2) else diff r ((n - 2) / 2)

let rec size = function
| E           -> 0
| N (_, l, r) -> let m = size r in 2 * m + 1 + diff l m

For the details I defer to the explanation in the paper; the idea is to avoid traversing the entire tree and exploiting the Braun property by calculating the size from the shape of each left child of the right spine; this reduces the complexity to O(log² n). For completeness, Braun trees support random access in logarithmic time:

let rec get h i = match h with
| E                      -> failwith "index out of bounds"
| N (e, _, _) when i = 0 -> e
| N (_, l, r)            ->
  if i mod 2 = 1 then get l ((i - 1) / 2) else get r ((i - 2) / 2)

The meat of the stew is the heap operations: insertion, and deletion or replacement of the minimum. All three operations must simultaneously keep two invariants: the Braun property and the heap property. Replacing the minimum doesn't change the shape of the tree, only the values stored in it, so it only has to maintain the heap property. The algorithm is the functional analog to the binary heap's siftdown:

let rec rep compare e = function
| E           -> failwith "empty heap"
| N (_, l, r) -> siftdown compare (N (e, l, r))

and siftdown compare n = match n with
| N (e, (N (el, _, _) as l), E) ->
  if compare e  el < 0 then n
    else N (el, rep compare e l, E)
| N (e, (N (el, _, _) as l), (N (er, _, _) as r)) ->
  if compare e  el < 0
  && compare e  er < 0 then n else
  if compare el er < 0
    then N (el, rep compare e l, r)
    else N (er, l, rep compare e r)
| _ -> n

It performs a case analysis to determine the least among the left and the right nodes (if the latter exists) to swap the greater root value downwards the tree. In contrast, insertion does change the shape of the tree. From |r| ≤ |l| ≤ |r| + 1 we can conclude that |l| ≤ |r| + 1 ≤ |l| + 1; this means that inserting on the right child and exchanging it with the left one maintains the Braun property. The value in the root bubbles down to its proper place to maintain the heap property:

let rec ins compare x = function
| E           -> singleton x
| N (e, l, r) ->
  if compare x e < 0
    then N (x, ins compare e r, l)
    else N (e, ins compare x r, l)

This definitely is elegant. Deletion is almost as streamlined, but with a twist:

let rec del compare = function
| E           -> failwith "empty heap"
| N (_, t, E) -> t
| N (_, E, _) -> assert false
| N (_, (N (e, _, _) as l), (N (e', _, _) as r)) ->
  if compare e e' < 0
    then N (e ,               r, del compare l)
    else N (e', rep compare e r, del compare l)

The first and second case are the trivial base cases. By the Braun property, if the right child is empty the left child can at most hold one item, and the third case exhausts the pattern asserting the impossibility. The fourth, last case is the recursive step. To eliminate a node while maintaining the Braun property we must essentially undo the swap in the insertion by deleting on the left child and exchanging it with the right one. To maintain the heap property, however, we must select the lesser of both children. If it happens to be the left one we can do just that. If it is the right one we can't blindly delete its minimum as we could end up in violation of the Braun invariant by widening the size difference to 2. This makes necessary to replace the minimum on the right with the minimum on the left, which in effect amounts to a tree rotation. With this the module is complete:

end

To build an imperative heap on Braun trees is a simple matter of wrapping the module, as I've remarked above:

module Heap : sig
  type 'a t
  val make       : ('a -> 'a -> int) -> 'a t
  val is_empty   : 'a t -> bool
  val length     : 'a t -> int
  val min        : 'a t -> 'a
  val replacemin : 'a t -> 'a -> unit
  val removemin  : 'a t -> unit
  val insert     : 'a t -> 'a -> unit
end = struct
  type 'a t = { compare : 'a -> 'a -> int; mutable root : 'a Braun.t; }
  let make compare = { compare = compare; root = Braun.empty }
  let is_empty   h   = Braun.is_empty h.root
  let length     h   = Braun.size h.root
  let min        h   = Braun.min h.root
  let replacemin h x = h.root <- Braun.rep h.compare x h.root
  let removemin  h   = h.root <- Braun.del h.compare   h.root
  let insert     h x = h.root <- Braun.ins h.compare x h.root
end

This is exactly the same interface as before. That said, I'm not very clear on the advantages of Braun trees compared to, say, binomial heaps or pairing heaps supporting merge operations in constant time. In any case, if you need the fastest priority queues and you don't mind using an imperative implementation, array-based binary heaps are the simplest, most understood data structure around. You just need a good implementation of extensible arrays.

As an aside, I have a confession to make. To reach this result I've refactored the code several times and at each step I checked that I didn't break the Braun heap invariants by running a unit test fixture against every change, with the help of a modified version of this simple test harness. I'm not a fan of TDD, as I prefer the logic and invariants to guide me in coding. The tests were a good sanity check to assert quickly if I was careful all along or if I made a mistake. In this sense it is a good safety net for exploratory programming, after the type-checker and the compiler verify that everything is right. I stress the fact that I still think that rigorous reasoning about the code comes first, types come second and exploratory programming using test is a follows from that.

2010-02-16

A Heap of (Regular) Strings

Once you have a priority queue, suddenly it gets put to use more than once. Reading Pete Manolios' "Enumerating the strings of a Regular Expression", I found (§ 3.4) the following single-paragraph explanation of an algorithm for generating all the strings accepted by a DFA:

Another approach is to […] enumerate the expression using the DFA. This can be done with a function enumDfa whose single argument is a list of pairs, where each pair consists of a string and a state in the DFA. We maintain the invariant that the pairs in the list are sorted with respect to ≤, using their strings as keys. In addition, the path through the DFA determined by the string of a pair leads to the state of the pair. The inital call of enumDfa consists of the list with the single pair consisting of the empty string and the start state. As long as its argument is not empty, enumDfa removes the pair at the head of the list. If it contains an accepting state, the string is added to the enumeration. In either case, the pairs consisting of states reachable in a single step and their corresponding strings are inserted into the list, which is the argument to the recursive call of enumDfa.

Here ≤ is a regular preorder as defined in the paper; in order for the enumeration to exhaust all possible recognized strings shorter strings must compare earlier than longer ones. I generalize a bit and use lists:

let regular_compare cmp (l : 'a list) (r : 'a list) =
  let rec go c = function
  | [], [] ->  c
  | [], _  -> -1
  | _ , [] ->  1
  | x :: xs, y :: ys -> go (if c = 0 then cmp x y else c) (xs, ys)
  in go 0 (l, r)

Note how the list comparison is based on an elementwise comparison, and once corresponding elements are found not to match it is still necessary to recur on the tail to look for a potential difference in length that overrides a difference in content. I represent DFAs by nodes that can be terminal or not, and that are followed by a list of labeled successor edges:

type 'a dfa = { final : bool; succ : ('a * 'a dfa) list }

This simple representation suffices to generate regular strings. Manolios's algorithm is straightforward, if we replace the list sorted by its first component (the shortest string generated so far) by a binary heap:

let enum_dfa cmp (f : 'a list -> unit) (g : 'a dfa) =
  let h = Heap.make (fun (w, _) (w', _) -> regular_compare cmp w w') in
  Heap.insert h ([], g);
  while not (Heap.is_empty h) do
    let (w, q) = Heap.min h in
    Heap.removemin h;
    if q.final then f w;
    List.iter (fun (a, q) -> Heap.insert h (w @ [a], q)) q.succ
  done

The heap initially contains the start state of the DFA labeled by the empty string. This shortest string is repeatedly taken out of the heap together with the state reachable by it. If this state is final, the string is recognized by supplying it to the enumerating function f. All the edges leading out of said state augment the current string by one more character. This algorithm in fact performs a level-first search of the DFA. As it is the enumeration is extremely simple but crude and too low level. It is not difficult to wrap it in an unfold with rather better functional structure, besides the ability to control the number of strings visited:

let unfold_iterator f e (iter : ('a -> unit) -> 'b -> unit) o =
  let arg = ref e
  and res = ref [] in
  begin try
    iter (fun x -> match f (!arg, x) with
    | Some (e, y) -> arg := e; res := y :: !res
    | None        -> raise Exit) o
  with Exit -> () end;
  List.rev !res

With this, stopping after generating N strings is a one-liner:

let take_iterator n =
  unfold_iterator (fun (i, x) -> if i > 0 then Some (i-1, x) else None) n

as is directly taking the first N character strings as such recognized by a textual DFA:

let implode l =
  let b = Buffer.create 8 in
  List.iter (Buffer.add_char b) l;
  Buffer.contents b

let take_str n dfa = List.map implode (take_iterator n (enum_dfa compare) dfa)

As a first example, here is the DFA recognizing Doug McIlroy's sandwich language ab*a:

let sandwich =
  let     s2 = { final = true ; succ = []; } in
  let rec s1 = { final = false; succ = [('b', s1); ('a', s2)]; } in
  let     s0 = { final = false; succ = [('a', s1)]; } in
  s0

Its first six words are:

# take_str 6 sandwich ;;
- : string list = ["aa"; "aba"; "abba"; "abbba"; "abbbba"; "abbbbba"]

As a second example, here is the DFA recognizing Doug McIlroy's even A's language (ab*a|b)*:

let even_a =
  let rec s1 = { final = false; succ = [('a', s0); ('b', s1)]; }
      and s0 = { final = true ; succ = [('b', s0); ('a', s1)]; } in
  s0

Its first 20 words are:

# take_str 20 even_a ;;
- : string list =
[""; "b"; "aa"; "bb"; "aab"; "aba"; "baa"; "bbb"; "aaaa"; "aabb"; "abab";
 "abba"; "baab"; "baba"; "bbaa"; "bbbb"; "aaaab"; "aaaba"; "aabaa"; "aabbb"]

Note that the brevity of the enumeration procedure has nothing to do with functional programming or the expressiveness of OCaml: the same solution would translate almost verbatim to an imperative language like C, modulo the explicit creation and destruction of the heap and its elements. Also, note how graph traversals are one-to-one with the data structure used to keep intermediate state: with a stack you have DFS, with a queue you have BFS, and with a priority queue you have some kind of LFS, depending on the underlying order chosen.

2010-02-14

Heap Up (the Bernstein Sums)

Happy Valentine's! The latest Programming Praxis calls for an implementation of the prime Sieve of Atkins. This sieve saves a bunch of operations with a preprocessing phase in which only square-free candidates are retained for latter sieving. For this to be efficient it is necessary to quickly count solutions to certain binary quadratic forms k·x² ± y² = n. Dan Berstein has published an elegant algorithm to quickly generate the solutions to such types of polynomial sums. It crucially requires a priority queue to ensure that the solutions are generated in order and that none is left out. The purpose of this post is to show an implementation of an imperative priority queue using a binary heap.

A binary heap is a sorted data structure over an ordered set (S, ≤): it is a complete binary tree such that every node is less than or equal to either children. The first property is usually called the shape property and amounts to requiring that every level h except possibly the last has exactly 2h nodes; the second property is called the heap property. The shape property makes it convenient (and usual) to represent binary heaps with N elements as arrays of size N: a node with index 1 ≤ jN has children with indices 2·j and 2·j + 1, in that order. The heap property is maintained by two operations siftup and siftdown that bubble up or down elements not necessarily in order into their proper position.

Since a heap is a dynamic structure, I will need a dynamic array or vector to back it up. The following signature will be sufficient for now; I will give the implementation later:

module Vec : sig
  type 'a t
  val make   : unit -> 'a t
  val length : 'a t -> int
  val get    : 'a t -> int -> 'a
  val set    : 'a t -> int -> 'a -> unit
  val push   : 'a t -> 'a -> unit
  val pop    : 'a t -> 'a
end

The intended semantics of a vector is that of normal arrays with the addition that its length can only change via a sequence of push and pop operations, subject to the following laws:

  1. length (make ()) ≡ 0
  2. pop (make ()) ≡ ⊥
  3. push v x; pop vx
  4. push v x; pop v; vv
  5. push v (pop v); vv, if length v > 0
  6. push v x; get v (length v - 1)x

Given Vec as a building block, a binary heap implementing a priority queue has the following signature:

module Heap : sig
  type 'a t
  val make       : ('a -> 'a -> int) -> 'a t
  val is_empty   : 'a t -> bool
  val min        : 'a t -> 'a
  val replacemin : 'a t -> 'a -> unit
  val removemin  : 'a t -> unit
  val insert     : 'a t -> 'a -> unit
end (* … *)

Heaps are fully polymorphic since when made they are associated with a comparison function:

end = struct
  type 'a t = { compare : 'a -> 'a -> int; heap : 'a Vec.t; }

  let make compare = { compare = compare; heap = Vec.make (); }

A new heap is backed by an empty vector. The first crucial operation is siftup: it bubbles up the last element (the one with index length h.heap - 1), comparing and swapping it with its parent until the heap property is restored, namely, it becomes less than or equal than any of the children it has encountered on its voyage up the tree:

  open Vec

  let siftup h =
    let rec go cmp v e i =
      let p = (i + 1) / 2 - 1 in
      if p >= 0 && cmp (get v p) e > 0 then begin
        set v i (get v p);
        go cmp v e p
      end else set v i e
    in
    let n = length h.heap in
    go h.compare h.heap (get h.heap (n - 1)) (n - 1)

Note that the parent of a child with index 1 ≤ i < N has index ⌊i/2⌋, but since vectors are 0-based it is necessary to adjust for negative underflow while using integer truncating division (with the usual machine-integer semantics, -1 / 0 == 0 which is not correct). Siftup keeps comparing the node e at index i being inserted with its parent at index p: if it exists and is greater than e they get swapped and the process is repeated until it becomes the new root or it finds a parent with a smaller value. Since the tree is complete, it has exactly ⌊lg N⌋ + 1 levels, which is the number of iterations of this procedure. This iteration is implemented as a tail-recursive lambda-lifted function go for efficiency. The complementary siftdown is completely symmetric to it:

  let siftdown h =
    let rec go cmp v n e i =
      let c =
        let l = i * 2 + 1 in
        let r = l + 1 in
        if r < n && cmp (get v l) (get v r) > 0 then r else l
      in
      if c < n && cmp (get v c) e < 0 then begin
        set v i (get v c);
        go cmp v n e c
      end else set v i e
    in go h.compare h.heap (length h.heap) (get h.heap 0) 0

Instead of bubbling the last element up the tree, siftdown bubbles the first element down. It compares node e at index i with the smaller c of their children 2·i and 2·i + 1 (again making allowances for the 0-based indexing), if they exist. If it does and is less than e, they are swapped and the process repeated until it becomes the last leaf or it finds a pair of children with greater values. By the same analysis than before, this can happen at most ⌊lg N⌋ + 1 times. Again, the iteration is implemented as a tail-recursive lambda-lifted function go. With this done, the priority queue is implemented with a few more lines.

A heap is empty if and only if the underlying vector is:

  let is_empty h = length h.heap = 0

By the heap property, the minimum element is the root of the heap, if it exists:

  let min h =
    if is_empty h then raise Not_found;
    get h.heap 0

A low-level efficient operation is to replace the minimum element by another, maintaining the heap property:

  let replacemin h x =
    if is_empty h then raise Not_found;
    set h.heap 0 x; siftdown h

In order to remove the minimum element another one must be found in its place. This decreases the length of the heap by one, which means that the easiest-to-access candidate is pop h.heap. I don't bother with a guard as by law 2 above pop is partial; however, I must be careful not to try to set the last element into an empty array. This element is placed at the root and sifted down as with replacemin:

  let removemin h =
    let x = pop h.heap in
    if not (is_empty h) then begin
      set h.heap 0 x; siftdown h
    end

Finally, the dual operation of inserting a new element into a heap boils down to placing it at the end (cf law 6 above) and restoring the heap property by bubbling it up:

  let insert h x =
    push h.heap x; siftup h
end

It should be obvious that replacemin h xinsert h x; removemin h where the heap property is kept in suspense by eliminating the intervening siftup h. It only remains to complete the implementation of Vec. This is a completely straightforward amortized O(1) extensible array via doubling in expand. The only hack I use is a sprinkling of Obj.magic to minimize space leaks:

module Vec : sig (* … *) end = struct
  type 'a t = { mutable cnt : int; mutable len : int; mutable arr : 'a array }

  let make () = let len = 16 in
    { cnt = 0; len = len; arr = Array.make len (Obj.magic 0); }

  let length a = a.cnt

  let expand a =
    assert (a.len = a.cnt);
    a.len <- 2 * a.len;
    let arr = Array.make a.len (Obj.magic 0) in
    Array.blit a.arr 0 arr 0 a.cnt;
    a.arr <- arr;
    assert (a.cnt < a.len)

  let get a i =
    if not (0 <= i && i < a.cnt) then failwith "get";
    Array.unsafe_get a.arr i

  let set a i x =
    if not (0 <= i && i < a.cnt) then failwith "set";
    Array.unsafe_set a.arr i x

  let push a x =
    if a.len = a.cnt then expand a;
    Array.unsafe_set a.arr a.cnt x;
    a.cnt <- a.cnt + 1

  let pop a =
    if a.cnt = 0 then raise Not_found;
    a.cnt <- a.cnt - 1;
    let x = Array.unsafe_get a.arr a.cnt in
    Array.unsafe_set a.arr a.cnt (Obj.magic 0);
    x
end

With this, Bernstein's algorithm admits a straightforward implementation. Since it stores triples (y, a, b) where y = p(a) + q(b) for polynomials p and q I need to define a lexicographical ordering on integer triples:

let cint : int -> int -> int = Pervasives.compare

let c3int (p, m, b) (q, n, c) =
  let cmp = cint p q in
  if cmp <> 0 then cmp else
  let cmp = cint m n in
  if cmp <> 0 then cmp else
  cint b c

Now the algorithm takes an enumerating function f to be called with each element of the triple, up to a limit n:

let bernstein_sum p q (f : int -> int -> int -> unit) n =
  let tab = Array.init (n + 1) (fun a -> (p a, a)) in
  Array.sort (fun (p, _) (q, _) -> cint p q) tab;
  let pa, _ = tab.(0) in
  let h = Heap.make c3int in
  for b = 0 to n do
    Heap.insert h (pa + q b, 0, b)
  done;
  while not (Heap.is_empty h) do
    let (y, i, b) = Heap.min h in
    let (pa, a) = tab.(i) in
    if i < n
      then Heap.replacemin h (fst tab.(i + 1) - pa + y, i + 1, b)
      else Heap.removemin h;
    f a b y
  done

As Bernstein himself notes, for sufficiently simple p and q the table of values of p(a) can be dispensed with completely. In any case, the following example nicely exercises the algorithm:

# let sq i = i*i in bernstein_sum sq sq (Printf.printf "%2d^2 + %2d^2 = %d\n") 10 ;;