2020-03-30

Programming Algorithms: Synchronization

This is the final chapter of the book, in which we will discuss optimization of parallel computations: whether concurrently on a single machine in and a shared-memory setting or in a distributed shared-nothing environment. This is a huge topic that spans synchronization itself, parallelization, concurrency, distributed computations, and the functional approach. And every senior software developer should be well-versed in it.

Usually, synchronization is studied in the context of system or distributed programming, but it has a significant algorithmic footprint and is also one of the hottest topics for new algorithm research. In fact, there are whole books that concentrate on it, but, usually, they attack the problem from other angles, not focusing on the algorithmic part. This chapter will be more algorithm-centered, although it will also present an overview of the problem space. SO that, in the end, you'll have a good foundation to explore the topic further if a desire or need for that will appear.

Let's start from the basics. In the previous chapters of the book we, mainly, viewed algorithms as single computations running without external interruptions. This approach, obviously, removes the unnecessary complexity, but it also isn't totally faithful to reality. Most of the programs we deal with, now, run in multiprocessing environments (sometimes, even distributed ones), and even when they don't utilize these capabilities, those are still available, they sometimes have their impact, and, besides, might have improved the performance of the programs if they would have been utilized. The majority of the backend stuff, which, currently, is comprised of services running in the datacenters, are multithreaded. There's a notorious "Zawinski's Law" that states that every program attempts to expand until it can read mail. Those programs which cannot so expand are replaced by ones which can. Being a good joke it also reflects an important truth about the tendency of all programs over time to become network-aware, and thus distributed to at least some extent.

There are two principally different types of environments in which the programs that need synchronization run: shared-memory and shared-nothing ones.

In a shared-memory setting, there exists some shared storage (not necessarily, RAM) that can be directly accessed by all the threads[1] of the application. Concurrent access to data in this shared memory is the principal source of the synchronization challenges, although not the only one. The example of a shared-memory program is a normal application that uses multithreading provided either directly by the OS or, more frequently, by the language runtime[2].

The opposite of shared-memory is a shared-nothing environment, in which all threads[3] don't have any common data storage and can coordinate only by sending messages directly to other processes. The contents of the messages have to be copied from the memory of the sender to the receiver. In this setting, some of the synchronization problems disappear, but others still remain. At the fundamental level, some synchronization or coordination still needs to happen. From a performance standpoint, however, the shared-nothing mode is, usually, inferior due to the need for additional data copying. So, both paradigms have their place and the choice, which one to utilize, depends on the context of a particular task.

The main goal of synchronization is ensuring program correctness when multiple computations are running in parallel. Another side of the coin is achieving optimal performance, which is also addressed by parallelization that we have somewhat discussed in a couple of prior chapters. Prioritizing performance before correctness, although tempting, is one of the primary sources of bugs in the concurrent systems. The trivial example would be building a shared-memory program without explicit use of any synchronization mechanisms. It is, definitely, the most performant approach, but non-coordinated access to the shared data will inevitably result in failures like data corruption.

Synchronization Troubles

So, let's talk in more detail about the most common synchronization problems that the methods we will discuss next are trying to handle. Such situations are called race conditions for there's a situation when multiple threads compete for the same resource — be it data storage or processor — and, in the absence of special coordination, the order of execution will be unspecified, which may result in unpredictable and unintended outcomes. There are two main results of this unpredictability (often, both occur simultaneously):

  • data corruption or loss
  • incorrect order of execution up to total failure of the program

Here is the simplest code segment that is amenable to data corruption in multithreaded programs:

(incf i)

It seems like there's just one operation involved — how can there be a race condition? From the point of view of a programmer in a high-level language, indeed, we deal with a single operation, but if we go deeper to the level of machine code we'll see that it is not the case. The relevant assembly snippet will, probably, contain three instructions:

mov i, register
inc register
mov register, i

You've just seen one more convincing evidence why every programmer should understand how the lower levels of his platform operate. :)

The issue is that modern processors can't directly modify data in the RAM (our variable i). First, the data needs to be moved into a register, only then some operation on it may be performed by the CPU, and, finally, it needs to be put back where the high-level program can find it. If an interrupt occurs (we're talking about multithreaded execution in a single address space, in this context) after mov i, reg the current thread will remember the old value of i (let it be 42) and be put into a waitqueue. If another thread that wants to change i is given processor time next, it may set it to whatever value it wants and continue execution (suppose it will be 0). However, when the turn is returned to the first thread, it will increment the value it remembered (42), so i will change the value in the following sequence: 42 - 0 - 43. Hardly, it's an expected behavior.

Such data corruption will only impact the mentioned variable and may not cause catastrophic failures in the program. Its behavior will be incorrect, but in some situations that can be tolerated (for instance, if we gather some statistics, and occasional off-by-one errors will not be noticed). Yet, if i was some counter that impacts the core behavior of the program, it might easily lead to a catastrophe.

Utimately, incorrect execution order should be considered the root cause of all synchronization problems. And here it is also manifest: we expected increment to be a single (atomic) operation and thus finish execution before anything else will happen to i.

What are some other common cases of execution order errors? The most well-known and dreaded race condition is a deadlock. It is a situation of mutual blocking among two or more threads. Here is the simplest illustration of how it can occur:

thread 1 ---> acquire resource1 --> try to acquire resource2
thread 2 --> acquire resource2 ------> try to acquire resource1

In other words, two threads need exclusive access to two resources, but the order of access is opposite and the timing of the operations is such that the first thread manages to acquire access to the first resource while the second — to the second. After that, the deadlock is inevitable and both threads will be blocked as soon as they will try to access the other resource. The period between each thread acquiring the first resource for exclusive access and the release of this resource is called a critical section of the program. Only in the critical sections, a synchronization issue may manifest.

The only way to untangle "from within" such deadlock situation is for one of the threads to release the resource it already holds. Another approach, which requires external intervention, is often employed in database management systems — deadlock monitoring. A separate thread is periodically examining blocked threads to check for some conditions that signify a deadlock situation, and it resets the threads that were spotted in such a condition. Yet, instead of trying to fix the deadlock situations, it may be better to prevent them from occurring altogether. The prevention techniques may utilize time-limited exclusive leases on resources or mandating the threads to acquire resources in a specific order. However, such approaches are limited and don't cover all the use cases. It would be nice to find some way to totally exclude deadlocks, but we should remember that the original reason why they may occur, at all, is the need to prevent data corruption in the case of uncontrolled access to the data. Exclusive access to the resource ensures that this problem will not occur, but results in the possibility of a deadlock, which is a comparatively lesser evil.

A livelock is a dynamic counterpart to deadlock which occurs much rarely. It is a situation when threads don't constantly hold the resources exclusively (for instance, they might release them after a timeout), but the timing of the operations is such that at the time when the resource is needed by one thread it happens to be occupied by the other, and, ultimately, mutual blocking still occurs.

One more obnoxious race condition is priority inversion — a phenomenon one can frequently observe in real life: when a secondary lane of cars merges into the main road, but, for some extraneous reason (traffic light malfunctioning, an accident that is blocking part of the road, etc.), the cars from it have more time to merge than the ones of the main road to progress. Priority inversion may be the reason for a more severe problem, which is starvation — a situation when the execution of a thread is stalled as it can't access the resource it needs. Deadlocks result in starvation of all the involved threads, but the issue may occur in other conditions, as well. I would say that starvation or, more generally, underutilization is the most common performance issue of multithreaded applications.

Low-Level Synchronization

I hope, in the previous section, the importance of ensuring proper execution order in the critical sections of the program was demonstrated well enough. How to approach this task? There are many angles of attack. Partially, the problem may be solved by the introduction of atomic operations. Atomic increment/decrement are a common example of those, which may be found in the ecosystem of the majority of the programming languages. For instance, SBCL provides an sb-ext:atomic-incf macro that operates on the fixnum slots of structures, array cells, contents of cons pairs or global variables. Some other languages, like Java, provide AtomicInteger and similar structures that guarantee atomic operations on its main slot.

What enables atomic operations are special hardware instructions:

  • TSL — test and set lock
  • CAS — compare and swap
  • LL/CS — load link/store conditional

The most widespread of them is CAS that has the same effect as if the following code would work as a single atomic operation:

(defmacro cas (place old new)
  `(when (eql ,place ,old)
     (:= ,place ,new))

Based on this spec, we could define atomic-incf using cas:

(defmacro atomic-incf (place &optional i)
  (let ((cur (gensym "CUR"))
        (rez (gensym "REZ")))
    `(loop :for ,rez := (let ((,cur ,place))
                          (cas ,place ,cur (+ ,cur ,i)))
           :when ,rez :do (return ,rez))))

Here, we read the current value of place, and then try to set it with cas. These two operations happen non-atomically, so there's a chance that cas will return nil. In that case, we redo the whole sequence again. It is clear that execution time of such operation is non-deterministic, but, in a reasonably configured multithreaded system, there should be, generally, just a single chance for cas to fail: when the thread is preempted between the assignment and cas. It shouldn't repeat the second time this thread gets its time slice for it should have enough time to complete both operations considering that it will start from them.

Another important low-level instruction is a memory barrier. It causes the CPU to enforce an ordering constraint on memory operations issued before and after the barrier instruction. I.e. the operations issued prior to the barrier are guaranteed to be performed before operations issued after the barrier. Memory barriers are necessary because most modern CPUs employ performance optimizations that can result in out-of-order execution. The reordering of memory loads and stores goes unnoticed within a single thread of execution but can cause unpredictable behavior in concurrent programs. One more leak from the low level adding to the list of synchronization worries...

On top of CAS and atomic operations, some higher-level synchronization primitives are provided by the OS and the execution runtimes of most of the programming languages. The most popular of them is the semaphore. It is a counter that is initially set to the number of threads that can proceed past querying its value. If the counter is above zero, the thread may continue execution, but it also atomically decrements the counter. This operation is usually called wait, acquire or down on the semaphore. However, if the counter is already down to zero, the thread goes to sleep and is put into an OS waitqueue until the wakeup notification arrives. The notification is initiated by some thread calling release/up on the same semaphore. This operation atomically increments the counter value and also allows some of the waiting threads to continue execution. The most used type of semaphores is called the mutex and it allows only a single thread to enter and also mandates the implementation to check that the thread that releases the mutex is the one that has previously acquired it. There are also other types of semaphores or more complex locks built on top of them, such as the read-write lock or a monitor.

Semaphores are an alternative to a lower-level spin-lock primitive that uses busy waiting, i.e. constant checking of the counter variable until it increases above zero. Another, more general name, for this method is polling that refers to constantly querying the state of some resource (a lock, a network socket, a file descriptor) to know when its state changes. Polling has both drawbacks and advantages: it occupies the thread instead of yielding CPU to other workers, which is a serious downside, but it also avoids expensive utilization of the OS context-switching required by semaphores.

So, both semaphores and spin-locks find their place. In the low-level OS code, spin-locks prevail, while semaphores are a default synchronization primitive in the user-space.

Mutual Exclusion Algorithms

Relying on hardware features for synchronization is a common approach taken by most software systems. However, since the beginning of work on this problem, computer scientists, including such famous algorithmists as Dijkstra and Lamport, proposed mutual exclusion algorithms that allowed guarding the critical sections without any special support from the platform. One of the simplest of them is the Peterson's algorithm. It guarantees mutual exclusion of two threads with the use of two variables: a two-element array interest and a boolean turn. A true value of the interest item corresponding to a thread indicates that it wants to enter the critical section. Entrance is granted if a second thread does not want the same or it has yielded priority to the first thread.

(defparameter *interest* (vec nil nil))
(defparameter *turn* nil)

(defun peterson-call (i fn)
  (let ((other (abs (1- i))))
    (:= (? *interest* i) t
        *turn* other)
    ;; busy waiting
    (loop :while (and (? *interest* other) (= *turn* other))) 
    ;; critical section
    (call fn)
    (:= (? *interest* i) nil))

The algorithm satisfies the three essential criteria to solve the critical section problem: mutual exclusion, progress, and bounded waiting. Mutual exclusion means that several competing threads can never be in the critical section at the same time. For the Peterson's algorithms, if thread 0 is in its critical section, then (? *interest* 0) is true. In addition, either (? *interest* 1) is nil (meaning thread 1 has left its critical section and isn't interested in coming back into it) or turn is 0 (meaning that thread 1 is just now trying to enter the critical section but waiting), or thread 1 is trying to enter its critical section, after setting (? *interest* 1) to true but before setting *turn* to 0). So if both processes are in the critical section then we conclude that the state must satisfy (and (? *interest* 0) (? *interest* 1) (= *turn* 0) (= *turn* 1)), which is, obviously, impossible. I.e. only on of the threads could have entered the section. The condition of progress, basically, says that only those threads that wish to enter the critical section can participate in making the decision as to which one will do it next, and that this selection cannot be postponed indefinitely. In our case, a thread cannot immediately reenter the critical section if the other thread has set its interest flag. Thus the thread that has just left the critical section will not impact the progress of the waiting thread. Bounded waiting means that the number of times a thread is bypassed by another thread after it has indicated its desire to enter the critical section is bounded by a function of the number of threads in the system. In the Peterson's algorithm, a thread will never wait longer than one turn for entrance to the critical section.

The drawback of the Peterson's algorithms is busy waiting[4]. So, it may be compared to a spin-lock. There are a number of other similar algorithms, including the Dekker's and Lamport's ones, which also share this property. A newer Szymański's algorithm is designed to avoid busy waiting, but it requires access to the OS scheduling facilities to make the thread sleep, waiting for the wakeup call, making the algorithm similar to semaphores.

High-Level Synchronization

All the mentioned synchronization primitives don't solve the challenges of synchronization completely. Rather they provide tools that enable reasonable solutions but still require advanced understanding and careful application. The complexity of multithreaded programs is a level up compared to their single-threaded counterparts, and thus a lot of effort continues being spent on trying to come up with high-level ways to contain it. I.e. remove it from the sight of a regular programmer by providing the primitives that handle synchronization behind the scenes. A simple example of that is Java synchronized classes that employ an internal monitor to ensure atomic access to the slots of a synchronized object. The major problem with regular locks (like semaphores) is that working with them brings us into the realm of global state manipulation. Such locking can't be isolated within the boundaries of a single function — it leaks through the whole caller chain, and this makes the program much harder to reason about. In this regard, it is somewhat similar to the use of goto, albeit on a larger scale, and so a push for higher-level synchronization facilities resembles the Dijkstra's famous appeal to introduce structured programming ("goto considered harmful"). Ironically, Dijkstra is one of the creators of the classic synchronization mechanisms that are now frowned upon. However, synchronization has intrinsic complexity that can't be fully contained, so no silver bullet exists (and hardly will ever be created) and every high-level solution will be effective only in a subset of cases. I have seen that very well on my own when teaching a course on system programming and witnessing how students solve the so-called classic synchronization problems. The task was to apply both classic synchronization techniques (semaphores et al.) and the new high-level ones (using Erlang, Haskell, Clojure or Go, which all provide some of those). The outcome, in terms of complexity, was not always in favor of the new approaches.

There is a number of these classic synchronization problems, and I was even collecting them to be able to provide more variants of the tasks to diminish cheating. :) But, in essence, they all boil down to just a few archetypical cases: producer-consumer, readers-writers, sleeping barber, and the dining philosophers. Each problem demonstrates a certain basic synchronization scenario and allows the researchers to see how their approach will handle it. I won't include them in the book but strongly encourage anyone interested in this topic to study them in more detail and also try to solve using different synchronization mechanisms.

Now, let's talk about some of the prominent high-level approaches. Remember, that they try to change the paradigm and avoid the need for explicit locking of critical sections altogether.

Lock-free Data Structures

My favorite among them is lock-free data structures. This is a simple and effective idea that can help deal with many common use cases and, indeed, avoid the necessity for explicit synchronization. Still, their use is limited and, obviously, can't cover all the possible scenarios.

The most important among them is arguably a lock-free queue. It can be implemented in different ways, and there's a simple and efficient implementation using cas provided by SBCL in the SB-CONCURRENCY contrib package. Here is the implementation of the main operations (taken from the SBCL source code and slightly simplified):

(defstruct lf-queue
  (head (error "No HEAD.") :type cons)
  (tail (error "No TAIL.") :type cons))

(defun lf-enqueue (value queue)
  (let ((new (cons value nil)))
    (loop (when (eq nil (sb-ext:compare-and-swap (cdr (lf-queue-tail queue))
                                                 nil new))
            (:= (lf-queue-tail queue) new)
            (return value)))))

(defun lf-dequeue (queue)
  (loop (with ((head (lf-queue-head queue))
               (next (cdr head)))
          (typecase next
            (null (return (values nil
                                  nil)))
            (cons (when (eq head (sb-ext:compare-and-swap (lf-queue-head queue)
                                                          head next))
                    (let ((value (car next)))
                      (setf (cdr head) +dead-end+
                            (car next) +dummy+)
                      (return (values value
                                      t)))))))))

The value of this structure lies in that it enables the implementation of the master-worker pattern that is a backbone of many backend applications, as well as, in general, different forms of lock-free and wait-free coordination between the running threads. Basically, it's a lock-free solution to the producer-consumer problem. The items are put in the queue by some producer threads (masters) and consumed by the worker threads. Such an architecture allows the programmer to separate concerns between different layers of the application: for instance, one type of threads may be responsible for handling incoming connections and, in order to ensure system high availability, these threads shouldn't spend much time processing them. So, after some basic processing, the connection sockets are put into the queue, from which the heavy-lifting worker threads can consume them and process in a more elaborate fashion. I.e. it's a job queue for a thread pool. Surely, a lock-based queue may also be utilized as an alternative, in these scenarios, but the necessity to lock from the caller's side makes the code for all the involved threads more complicated: what if a thread that has just acquired the lock is abruptly terminated for some reason?

Data-Paralelism and Message Passing

Beyong thread pools, there's a whole concept of data parallelism, which, in essence, lies in submitting different computations to the pool and implementing synchronization as an orchestration of those tasks. In addition, node.js and Go use lock-free IO in conjunction with such thread pools (and a special syntax for its seamless integration) for an efficient implementation of user-space green threads to support this paradigm.

Even further alone this direction is Erlang that is a whole language built around lock-free IO, efficient user-space threading, and a shared-nothing memory model. It is The language of message-passing concurrency that aims to solve all synchronization problems within this single approach. As discussed in the beginning, such stance has its advantages and drawbacks, and so Erlang fits some problems (like coordination between a large number of simple agents) exceptionally well, while for others it imposes unaffordable costs in terms of both performance and complexity.

I won't go deeper into this topic as they are not directly related to the matter of this book.

STM

Another take on concurrency is the technology that is used, for quite a long time, in the database systems, and was reimplemented in several languages, being popularized by the author of Clojure — Software Transactional Memory (STM). The idea is to treat all data accesses in memory as part of transactions — computations that possess the ACID properties: atomicity, consistency, and isolation (minus durability, which is only relevant to the database systems persisting data on disk). These transactions should still be initiated by the programmer, so the control over synchronization remains, to a large extent, in their hands with some portion of the associated complexity. The transactions may be implemented in different ways, but they will still use locking behind the scenes, and there are two main approaches to applying locking:

  • pessimistic — when the locks are acquired for the whole duration of the transaction, basically, making it analogous to a very conservative programming style that avoids the deadlocks but seriously hinders program performance: acquiring all locks at once and then entering the critical section; in the context of STM, each separate variable will have its own lock
  • optimistic — when the initial state of the transaction variables is remembered in the thread-local storage, and locking occurs only at the last (commit) phase, when all the changes are applied — but only when there were no external changes to the transaction variables; if at least one of them were changed, the whole transaction would need to be rolled back and retried

In both cases, the main issue is the same: contention. If the number of threads competing for the locks is small, an optimistic approach should perform better, while, in the opposite case, there will be too many rollbacks and even a possibility of a livelock.

The optimistic transactions are, usually, implemented using the Multiversion Concurrency Control mechanism. MVCC ensures a transaction never has to wait to read an object by maintaining several versions of thid object. Each version has both a Read Timestamp and a Write Timestamp which lets a particular transaction read the most recent version of the object which precedes the own Read Timestamp of the transaction.

STM is an interesting technology, which hasn't proven its case yet beyond the distinct area of data management systems, such as RDBMs and their analogs.

Distributed Computations

So far, we have discussed synchronization, mainly, in the context of software running in a single address space on a single machine. Yet, the same issues, although magnified, are also relevant to distributed systems. Actually, the same models of computation are relevant: shared-memory and shared-nothing message passing. Although, for distributed computing, message passing becomes much more natural, while the significance of shared-memory is seriously diminished and the "memory" itself becomes some kind of a network storage system like a database or a network file system.

However, more challenges are imposed by the introduction of the unreliable network as a communication environment between the parts of a system. These challenges are reflected in the so-called "fallacies of distributed computing":

  • the network is reliable
  • latency is zero
  • bandwidth is infinite
  • the network is secure
  • topology doesn't change
  • there is a single administrator
  • transport cost is zero
  • the network is homogeneous
  • clocks on all nodes are synchronized

Another way to summarize those challenges, which is the currently prevailing look at it, is the famous Brewer's CAP Theorem, which states that any distributed system may have only two of the three desired properties at once: consistency, availability, and partition tolerance. And since partitional tolerance is a required property of any network system as it's the ability to function in the unreliable network environment (that is the norm), the only possible distributed systems are CP and AP, i.e. they either guarantee consistency but might be unavailable at times or are constantly available but might be sometimes inconsistent.

Distributed Algorithms

Distributed computation requires distributed data structures and distributed algorithms. The domains that are in active development are distributed consensus, efficient distribution of computation, efficient change propagation. Google pioneered the area of efficient network computation with the MapReduce framework that originated from the ideas of functional programming and Lisp, in particular. The next-generation systems such as Apache Spark develop these ideas even further.

Yet, the primary challenge for distributed systems is efficient consensus. The addition of the unreliable network makes the problem nontrivial compared to a single-machine variant where the consensus may be achieved easily in a shared-memory setting. The world has seen an evolution of distributed consensus algorithms implemented in different data management systems, from the 2-Phase Commit (2PC) to the currently popular RAFT protocol.

2PC is an algorithm for coordination of all the processes that participate in a distributed atomic transaction on whether to commit or rollback the transaction. The protocol achieves its goal even in many cases of temporary system failure. However, it is not resilient to all possible failure configurations, and in rare cases, manual intervention is needed. To accommodate recovery from failure, the participants of the transaction use logging of states, which may be implemented in different ways. Though usually intended to be used infrequently, recovery procedures compose a substantial portion of the protocol, due to many possible failure scenarios to be considered.

In a "normal execution" of any single distributed transaction, the 2PC consists of two phases:

  1. The "commit-request" or voting phase, in which a coordinator process attempts to prepare all the participating processes to take the necessary steps for either committing or aborting the transaction and to vote, either "Yes" (commit) or "No" (abort).
  2. The "commit" phase, in which, based on the voting of the participants, the coordinator decides whether to commit (only if all have voted "Yes") or rollback the transaction and notifies the result to all the participants. The participants then follow with the needed actions (commit or rollback) with their local transactional resources.

It is clear, from the description, that 2PC is a centralized algorithm that depends on the authority and high availability of the coordinator process. Centralized and peer-to-peer are the two opposite modes of the network algorithms, and each algorithm is distinguished by its level of centralization.

The 3PC is a refinement of the 2PC which is supposed to be more resilient to failures by introducing an intermediate stage called "prepared to commit". However, it doesn't solve the fundamental challenges of the approach that are due to its centralized nature, only making the procedure more complex to implement and thus having more failure modes.

The modern peer-to-peer coordination algorithm alternatives are Paxos and RAFT. RAFT is considered to be a simpler (and, thus, more reliable) approach. It is also, not surprisingly, based on voting. It adds a preliminary phase to each transaction, which is leader election. The election, as well as other activities within a transaction, don't require unanimous agreement, but a simple majority. Besides, execution of all the stages on each machine is timeout-based, so if a network failure or a node failure occurs, the operations are aborted and retried with an updated view of the other peers. The details of the algorithm can be best understood from the RAFT website, which provides a link to the main paper, good visualizations and other references.

Distributed Data Structures

We have already mentioned various distributed hash-tables and content-addressable storage as one of the examples of these types of structures. Another exciting and rapidly developing direction is eventually-consistent data structures or CRDTs (Conflict-free Replicated Data Types). They are the small-scale representatives of the AP (or eventually-consistent) systems that favor high availability over constant consistency, as they become more and more the preferred mode of operation of distributed system.

The issue that CRDTs address is conflict resolution when different versions of the structure appear due to network partitions and their eventual repair. For a general data structure, if there are two conflicting versions, the solution is either to choose one (according to some general rules, like take the random one or the latest one, or application-specific logic) or to keep both versions and defer conflict resolution to the client code. CRDTs are conflict-free, i.e. the structures are devised so that any conflict is resolved automatically in a way that doesn't bring any data loss or corruption.

There are two ways to implement CRDTs: convergent structures rely on the replication of the whole state, while commutative use operation-based replication. Yet, both strategies result in the CRDTs with equivalent properties.

The simplest CRDT is a G-Counter (where "G" stands for grow only). Its operation is based on the trivial fact that addition is commutative, i.e. the order of applying the addition operation doesn't matter: we'll get the same result as long as the number of operations is the same. Every convergent CRDT has a merge operation that combines the states of each node. On each node, the G-Counter stores an array that holds the per-node numbers of the local increments. And its merge operation takes the maximums of the elements of this array across all nodes while obtaining the value of the counter requires summing all of the cells:

(defstruct (g-counter (:conc-name nil))
  ccs)

(defun make-gcc (n)
  (make-g-counter :ccs (make-array n)))

(defun gcc-val (gcc)
  (reduce '+ (ccs gcc))

(defun gcc-merge (gcc1 gcc2)
  (map* 'max gcc1 gcc2))

The structure is eventually consistent as, at any point in time, asking any live node, we can get the current value of the counter from it (so there's constant availability). However, if not all changes have already been replicated to this node, the value may be smaller than the actual one (so consistency is only eventual once all the replications are over).

The next step is a PN-Counter (positive-negative). It uses a common strategy in CRDT creation: combining several simpler CRDTs. In this case, it is a combination of two G-Counters: one — for the number of increments, and another — decrements.

A set is, in some sense, a more sophisticated analog of a counter (a counter may be considered a set of 1s). So, a G-Set functions similar to a G-Counter: it allows each node to add items to the set that are stored in the relevant cell of the main array. The merging and value retrieval operations use union. Similarly, there's 2P-Set (2-phase) that is similar in construction to the PN-counter. The difference of a 2P-Set from a normal set is that once an element is put into the removal G-Set (called the "tombstone" set) it cannot be readded to the set. I.e. addition may be undone, but deletion is permanent. This misfeature is amended y LWW-Set (last-write-wins) that adds timestamps to all the records. Thus, an item with a more recent timestamp prevails, i.e. if an object is present in both underlying G-Sets it is considered present in the set if its timestamp in the addition set is greater than the one in the removal set, and removed — in the opposite case.

There are also more complex CRDTs used to model sequences, including Treedoc, RGA, Woot, Logoot, and LSEQ. Their implementations differ, but the general idea is that each character (or chunk of characters) is assigned a key that can be ordered. When new text is added, it's given a key that is derived from the key of some adjacent text. As a result, the merge is the best-possible approximation of the intent of the edits.

The use cases for CRDTs are, as mentioned above, collaborative editing, maintaining such structures as shopping carts (e.g. with an LWW-Set), counters of page visits to a site or reactions in a social network, and so on and so forth.

Distributed Algorithms in Action: Collaborative Editing

In fact, CRDTs are a data-structure-centric answer to another technology that is used, for quite some time, to support collaborative editing: Operational Transformation (OT). OT was employed in such products as Google Docs and its predecessors to implement lock-free simultaneous rich-text editing of the same document by many actors.

OT is an umbrella term that covers a whole family of algorithms sharing the same basic principles. Such systems use replicated document storage, i.e. each node in the system operates on its own copy in a non-blocking manner as if it was a single-user scenario. The changes from every node are constantly propagated to the rest of the nodes. When a node receives a batch of changes it transforms the changes before executing them to account for the local changes that were already made since the previous changeset. Thus the name "operational transformation".

The basic idea of OT can be illustrated with the following example. Let's say we have a text document with a string "bar" replicated by two nodes and two concurrent operations:

(insert 0 "f") # o1 on node1
(delete 2 "r") # o2 on node2

Suppose, on node 1, the operations are executed in the order o1, o2. After executing o1, the document changes to "fbar". Now, before executing o2 we must transform it against o1 according to the transformation rules. As a result, it will change to (delete 3 "r"). So, the basic idea of OT is to adjust (transform) the parameters of incoming editing operations to account for the effects of the previously executed concurrent operations locally (whether they were invoked locally or received from some other node) so that the transformed operation can achieve the correct effect and maintain document consistency. The word "concurrent" here means operations that happened since some state that was recorded on the node that has sent the new batch of changes. The transformation rules are operation-specific.

In theory, OT seems quite simple, but it has its share of implementation nuances and issues:

  • While the classic OT approach of defining operations through their offsets in the text seems to be simple and natural, real-world distributed systems raise serious issues: namely, that operations propagate with finite speed (remember one of the network fallacies), states of participants are often different, thus the resulting combinations of states and operations are extremely hard to foresee and understand.
  • For OT to work, every single change to the data needs to be captured: "Obtaining a snapshot of the state is usually trivial, but capturing edits is a different matter altogether. The richness of modern user interfaces can make this problematic, especially within a browser-based environment.
  • The notion of a "point in time" relevant to which the operations should be transformed is nontrivial to implement correctly (another network fallacy in play). Relying on global time synchronization is one of the approaches, but it requires tight control over the whole environment (which Google has demonstrated to be possible for its datacenter). So, in most cases, a distributed solution instead of simple timestamps is needed

THe most popular of these solutions is a vector clock. The VC of a distributed system of n nodes is a vector of n logical clocks, one clock per process; a local "smallest possible values" copy of the global clock-array is kept in each process, with the following rules for clock updates:

  • Initially all clocks are zero.
  • Each time a process experiences an internal event, it increments its own logical clock in the vector by one.
  • Each time a process sends a message, it increments its own logical clock in the vector by one (as in the bullet above, but not twice for the same event) and then sends a copy of its own vector.
  • Each time a process receives a message, it increments its own logical clock in the vector by one and updates each element in its vector by taking the maximum of the value in its own vector clock and the value in the vector in the received message (for every element).

You might notice that the operation of vector clocks is similar to the CRDT G-counter.

VCs allow the partial causal ordering of events. A vector clock value for the event x is less than the value for y if and only if for all indices the items of the x's clock are less or equal, and, at least for one element, they are stricktly smaller.

Besides vector clocks, the other mechanisms to implement distributed partial ordering include Lamport Timestamps, Plausible Clocks, Interval Tree Clocks, Bloom Clocks, and others.

Persistent Data Structures

To conclude this chapter, I wanted to say a few words about the role of the functional paradigm in synchronization and distributed computing. That's no coincidence that it was mentioned several times in the description of different synchronization strategies: essentially, functional programming is about achieving good separation of concerns by splitting computations into independent referentially-transparent units that are easier to reason about. Such an approach supports concurrency more natively than the standard imperative paradigm, although it might not be optimal computationally (at least, in the small). Yet, the gains obtained from parallelism and utilizing the scale of distributed computing may greatly outweigh this low-level inefficiency. So, with the advent of concurrent and distributed paradigms, functional programming gains more traction and adoption. Such ideas as MapReduce, STM, and message-passing-based coordination originated in the functional programming world.

Another technology coming from the functional paradigm that is relevant to synchronization is Purely Functional Data Structures. Their principal property is that any modification doesn't cause a destructive effect on the previous version of the structure, i.e., with each change, a new version is created, while the old one may be preserved or discarded depending on the particular program requirements. This feature makes them very well suited for concurrent usage as the possibility of corruption due to incorrect operation order is removed, and such structures are also compatible with any kind of transactional behavior. The perceived inefficiency of constant copying, in many cases, may be mostly avoided by using structure sharing. So the actual cost of maintaining these data structures is not proportional to their size, but rather constant or, at worst, logarithmic in size. Another name for these structures is persistent data structures — contrast to "ephemeral" ones which operate by destructive modification.

The persistent functional structure is, as we already mentioned in one of the preceding chapters, is a Lisp list[5] used as a stack. We have also seen the queue implemented with two stacks called a Real-Time Queue. It is a purely functional data structure, as well. The other examples are mostly either list- or tree-based, i.e. they also use the linked backbone structured in a certain way.

To illustrate once again how most persistent data structures operate, we can look at a Zipper that may be considered a generalization of a Real-Time Queue. It is is a technique of representing a data structure so that it is convenient for writing programs that traverse the structure arbitrarily and update its contents in a purely functional manner, i.e. without destructive operations. A list-zipper represents the entire list from the perspective of a specific location within it. It is a pair consisting of a recording of the reverse path from the list start to the current location and the tail of the list starting at the current location. In particular, the list-zipper of a list (1 2 3 4 5), when created will look like this: (() . (1 2 3 4 5)). As we traverse the list, it will change in the following manner:

  • ((1) . (2 3 4 5))
  • ((2 1) . (3 4 5))
  • etc.

If we want to replace 3 with 0, the list-zipper will become ((2 1) . (0 4 5)) while the previous version will still persist. The new zipper will reuse the list (2 1) and create a new list by consing 0 to the front of the sublist (4 5). Consequently, there memory after performing 2 movements and 1 update will look like this:

It is apparent that each operation on the zipper (movement or modification) adds at most a single additional element. So, it's complexity is the same as for normal lists (although, with larger constants).

Zippers can operate on any linked structures. A very similar structure for trees is called a Finger Tree. To create it from a normal tree, we need to put "fingers" to the right and left ends of the tree and transform it like a zipper. A finger is simply a point at which you can access part of a data structure.

Let's consider the case of a 2-3 tree for which the finger approach was first developed. First, we restructure the entire tree and make the parents of the first and last children the two roots of our tree. This finger is composed of several layers that sit along the spine of the tree. Each layer of the finger tree has a prefix (on the left) and a suffix (on the right), as well as a link further down the spine. The prefix and suffix contain values in the finger tree – on the first level, they contain values (2-3 trees of depth 0); on the second level, they contain 2-3 trees of depth 1; on the third level, they contain 2-3 trees of depth 2, and so on. This somewhat unusual property comes from the fact that the original 2-3 tree was of uniform depth. The edges of the original 2-3 tree are now at the top of the spine. The root of the 2-3 tree is now the very bottom element of the spine. As we go down the spine, we are traversing from the leaves to the root of the original 2-3 tree; as we go closer to the root, the prefix and suffixes contain deeper and deeper subtrees of the original 2-3 tree.

Now, the principle of operation (traversal or modification) on the finger tree is the same as with the zipper: with each change, some elements are tossed from one side of the spine to the other, and the number of such elements remains withing the O(log n) limits.

Finally, another data structure that is crucial for the efficient implementation of systems that rely solely on persistent data structures (like the Clojure language environment) is a Hash-Array Mapped Trie (HAMT). It may be used both in ephemeral and persistent mode to represent maps and sets with O(log n) access complexity[6]. HAMT is a special trie that uses the following two tricks:

  • as an array-mapped trie, instead of storing pointers to the children nodes in a key-value indexed with their subkeys, it stores a list an array of pointers and a bitmap that is used to determine if the pointer is present and at what position in the array it resides. This feature requires limiting the number of possible subkeys (for example, individual characters, which are the dominant use case for tries) to the length of a bitmap. The default length is 32, which is enough to represent English alphabet :)
  • however, the hash feature gives us a number of benefits including limiting the limitations on the subkeys. Actually, in a HAMT, all values are stored at the leaves that have the same depth, while the subkeys are obtained by, first, hashing the key and then splitting the obtained hash into n-bit ranges (where n is usually also 5)[7]. Each subkey is used as an index into the bitmap: if the element at it is 1 the key is present. To calculate the index of a pointer in the pointer array, we need to perform popcount on the preceding bits.

With such a structure, all major operations will have O(log 5) i.e. O(1) complexity. However, hash collisions are possible, so the hash-table-related collision considerations also apply to a HAMT. In other words, HAMTs are pretty similar to hash-tables, with the keys being split into parts and put into a trie. However, due to their tree-based nature, the memory footprints and the runtime performance of iteration and equality checking of the HAMTs lag behind array-based counterparts:

  • Increased memory overhead as each internal node adds an overhead over a direct array-based encoding, so finding a small representation for internal nodes is crucial.
  • On the other hand, HAMTs do not need expensive table resizing and do not waste (much) space on null references.
  • Iteration is slower due to non-locality, while a hash-table uses a simple linear scan through a continuous array.
  • Delete can cause the HAMT to deviate from the most compact representation (leave nodes with no children, in the tree)
  • Equality checking can be expensive due to non-locality and the possibility of a degenerate structure due to deletes.

So, what's the value of this structure if it's just a slightly less efficient hash-table? The difference is that a HAMT can not only be implemented both with destructive operations, but also, being a tree, it can be easily adapted to persistent mode with a usual path-copying trick that we have already seen.

Complexity estimations for persistent data structres use amortized analysis to prove acceptable performance (O(log n)). Another trick at play here is called scheduling, and it lies in properly planning heavy structure-rebuilding operations and splitting them into chunks to avoid having to execute some at a time when optimal complexity can't be achieved. To learn more about these topics read the seminal book by Chris Okasaki "Purely Functional Data Structures"[8] that describes these methods in more detail and provides complexity analysis for various structures.

Besides, the immutability of persistent data structures enables additional optimizations that may be important in some scenarios:

  • native copy-on-write (COW) semantics that is required in some domains and algorithms
  • objects can be easily memoized
  • properties, such as hashes, sizes, etc., can be precomputed

The utility of persistent data structures is only gradually being realized and apprehended. Recently, some languages, including Clojure, were built around them as core structures. Moreover, some people even go as far as to claim that git is a purely functional data structure due to its principal reliance on structure-sharing persistent trees to store the data.

Take-aways

We have covered a lot of ground in this chapter at a pretty high level. Obviously, you can go much deeper: the whole books are written on the topics of concurrency and distributed computing.

Overall, concurrency can be approached from, at least, three different directions:

  1. There's a low-level view: the means that should be provided by the underlying platforms to support concurrent operation. It includes the threading/process APIs, the atomic operation, synchronization, and networking primitives.
  2. Then, there's an architecture viewpoint: what constraints our systems should satisfy and how to ensure that. At this level, the main distinctions are drawn: shared-memory vs shared-nothing, centralized vs peer-to-peer.
  3. And, last but not least, comes the algorithmic perspective. What data structures (as usual, they are, in fact, more important than the algorithms) can be used to satisfy the constraints in the most efficient way possible, or to simplify the architecture? We have seen several examples of special-purpose ones that cater to the needs of a particular problem: lock-free data structures, eventually-consistent, purely functional persistent ones. And then, there are some areas where special-purpose algorithms also play a major role. Their main purpose, there, is not so much computational efficiency (like we're used to), but, mostly, correctness coupled with good enough efficiency[9]. Mutual exclusion and distributed consensus algorithms are examples of such targeted algorithm families.

There's a lot of room for further research in the realms of synchronization and, especially, distributed computation. It is unclear, whether the new breakthroughs will come from our current computing paradigms or we'll have to wait for the new tide and new approaches. Anyway, there's still a chance to make a serious and lasting contribution to the field by developing new algorithm-related stuff. And not only that. Unlike other chapters, we haven't talked much here about the tools that can help a developer of concurrent programs. The reason for that is, actually, an apparent lack of such tools, at least, of widely adopted ones. Surely, the toolbox we have already studied in the previous chapters is applicable here, but an environment with multiple concurrent threads and, possibly, multiple address spaces adds new classes of issues and seriously complicates debugging. There are network service tools to collect metrics and execution traces, but none of them is tightly integrated into the development toolboxes, not to speak of their limited utility. So, substantial pieces are still missing from the picture and are waiting to be filled.


[1] We will further use the term "thread" to denote a separate computation running as part of our application, as it is less ambiguous than "process" and also much more widespread than all the other terms.

[2] This internal "threading", usually, also relies on the OS threading API behind the scenes.

[3] In this context, they tend to be called "processes", but we'll still stick to the term "thread".

[4] The other apparent limitation of supporting only two threads can be lifted by a modification to the algorithm, which requires some hardware support.

[5] If we forbid the destructive rplaca/rplacd operations and their derivatives.

[6] and a quite high algorithm base — usually 32 — that means very shallow trees resulting in just a handful of hops even for quite large structures

[7] Except for the length of the leftmost range that depends on the number of bits in a hash. For instance, for a 32-bit hash, it may be 7, and the depth of the whole HAMT would be 5.

[8] His thesis with the same title is freely available, but the book covers more and is more accessible.

[9] The reason for that might be relative immaturity of this space, as well as its complexity, so that our knowledge of it hasn't been developed enough to reach the stage when optimization becomes the main focus.

No comments: