Coordinator - Design Discussions

This is a dump of discussions on both high level and implementation issues, mainly to help me keep track of them.


The cluster has a centralized coordinator service, and multiple masters and backups (clients for the coordinator). The coordinator manages the cluster membership, tablet configuration, and stores core metadata. The coordinator consists of small group of nodes. At any given point in time, exactly one of them is the leader, and the others are followers. Only the leader services the requests to the coordinator.

Coordinator Design Goals:

1. Thread Safety:

We want the coordinator to be thread safe.

2. Modularity:

Pull as much logic out of Coordinator as possible. Egs.:

  • TabletMap:
    • Examples: Atomic operations like addTablet, removeTabletsForTable, etc.
    • Done. Ryan's commit: fb0a623d3957a172314fbe0b53566a5699e1c0e6
  • Will:
    • Similar to TabletMap.
  • ServerList:
    • Examples:
      • Atomic operation to mark server crashed and update serverList version.
      • Any updates to serverList directly broadcasted to cluster without coordinator having to do it separately.

CoordinatorService also needs refactoring: Details.

3. Atomic Distributed State Change:


However, machines in the cluster can fail. This makes the distributed state change non-atomic. Non atomic state changes could result in inconsistent states.

Note: We're going to use createTable as a running example of the distributed state change operation.

A master can fail at various points in time:


Thus, by design, we have pushed out the job of taking care of inconsistencies arising due to master failures to master recovery.

The coordinator can also fail at various points in time:


Thus, the coordinator failure at some times can result in inconsistent state. (We can also rephrase this goal as fault tolerance.)

Commit Point:

For every coordinator operation, we can define a commit point, such that:

  1. If the coordinator failure occurs before commit point, the operation will be cleanly aborted (cleanly = no side effects / inconsistencies).
  2. If the coordinator failure occurs after the commit point, the operation is guaranteed to be completed.

Persisting information:

To achieve the above guarantee, we need to leave enough information around so that:

  1. The new leader is in the same state as the old leader
  2. The new leader can roll forward the operations to completion if needed.

This information has to be persisted across failures.

The leader should thus store information about requests being serviced in a highly reliable, consistent storage service - like LogCabin. LogCabin provides abstractions to append to and read from a highly reliable distributed log.

Encapsulating Information:

We encapsulate the information to be persisted in terms of state. The leader pushes the current state of the request being serviced to the log at distinct points. The new leader can infer the actions to be done from this state. That is, the state captures changes to the local state of the coordinator, as well as the communication done / to be done with the external world (masters / backups in the cluster) to change their state.

State Implementation:

State that is logged in LogCabin can be implemented as either a C++ struct or protobuf.

  • C++ structs would provide better performance; Protobuf implementation leads to extra overheads for serialization before appending to log and deserialization after extraction.
  • It is easier to express complex components in a protobuf (eg., a string with multiple pointers to various strings).

Since the latter could be important, and performance during writes is not a major concern for the coordinator (since we expect few writes), we choose protobuf implementation.

Logging states:

We log all the states necessary to make sure that if an operation was started, and failure occurred midway, it can be successfully completed during recovery. Note that for performance, we want to log as few states as possible while still getting the correctness. The general idea is to do the computations that can be done, log the state, then do actions that result in external effects (possibly based on the previous computations) – as many times as needed, and finally log that the operation was completed.

RPC thread and Replay thread:

The requests coming to the coordinator are serviced by the RPC thread in the current leader. The main motivation behind the replay thread is for the recovering coordinator (new leader) to "replay" the operations (if needed), and for the followers to keep track of what the leader is doing.

When the leader's RPC thread receives a request, it appends the request to the log. The leader's replay thread picks up the request from the log and does the required work. Every follower's replay thread can also follow the log in real time to keep their local state up to date, but only after that request has been completed (state indicates "done".) On recovery, the new leader's replay thread replays all log entries that have not yet been replayed, including the ones for incomplete actions. This can be represented by:

Design: RPC and Replay threads
// RPC
if (leader) {
    LogCabinEntry e = new LogCabinEntry(data about this operation)
    replyToRpc(read data to be returned from some common data struct or log); 

while (true) {
    e <- LogCabinIterator.getNextEntry();

    // Entry contains state for a pending request and i am not the leader
    // THEN stop at this point, and retry getting the entry.
    if (!e.done && me != leader) {

    // CASE 1: Entry contains state for a pending action and i am
    // a) recovering leader, OR
    // b) general case leader. OR
    // CASE 2: Entry contains state for a completed action and i am
    // a) follower trying to keep my state up-to-date in real time, OR
    // b) recovering leader catching up on state changes i missed,
    // THEN do the action corresponding to this entry.
    retVal = doAction(e);    // doAction is the dispatcher.

Note that when a client sends a request to the coordinator, we are not making any guarantees that that request will be completed. So it is perfectly fine to just drop a request in case of failure. However, we don't want to leave the cluster in an inconsistent state, which could happen if the original leader had partially serviced the request. This eliminates the need for rpc thread on the leader to append the state for the incoming request to the log. It can directly call the corresponding method in replay thread, which will then append the state when necessary. This modifies the above design to:

Change in Design: RPC and Replay threads
// RPC
if (leader) {
     retVal = doAction(opcode, rpc, NULL);    // doAction is the dispatcher.

while (true) {
    e <- LogCabinIterator.getNextEntry();

    // Same as previous design
    if (!e.done && me != leader) {

    // CASE 1a, 2a, 2b from previous design
    doAction(e.opcode, NULL, e);    // doAction is the dispatcher.

This describes, at a high level, the possible cases and the work to be done in each of the cases. A detailed discussion of the exact division of work between rpc thread and replay thread will be done in Coordinator Refactoring.

Order of execution on recovery:

When a leader fails and a new leader takes over, it will first replay the remaining log in order, and only after that start servicing requests / rpcs.

To understand why this decision was made, consider a simple example in the case where rpcs are serviced during replays:

The coordinator is servicing various requests, from various clients. One of these requests is a createTable request that has been shown explicitly. After completing this request, the coordinator informs the client of it completion. It then fails. On recovery, it replays the log and thus completes all the requests, including the createTable.

Now, while coordinator is replaying its log (note that clients know nothing about the coordinator crash or replay), the client decides to drop the table it had just created. So it sends a dropTable request. If that request is serviced during the replay, there's a chance that it will be serviced before the createTable log for that table is replayed, as shown below. This eventually results in an inconsistent state, wherein the client thinks that the table "foo" doesn't exist, and coordinator (and thus, the masters who own the tablets for "foo") think that it does exist.

Replaying the entire log before servicing new requests removes such inconsistencies.

Invalidating previous log entries:

Background note: The non-first time a log entry corresponding to a particular operation is written, it invalidates the previous log entry written for that operation. This can be implemented as: store the latest log entry number for an update to that table, in the coordinator's local data structure corresponding to that table - possibly "tables". The next time a log entry doing an operation on that table is written, it reads the previous log entry number from that data structure ("tables") and invalidates that log entry while writing the new one.


An operation's log entry should also invalidate other operations' log entries if those operations are rendered effect-less after this operation. This will clean up the lint from the log. For example, a deleteTable request should invalidate the createTable entry corresponding to that table. It should also invalidate any other operations that might have happened on the table or the tablets contained in it (like splitTablet).

Implementation of createTable (detailed discussion):

This is the detailed version of createTable algo:

First try:

The following logging model could work:

Assume the coordinator fails, and on recovery, only the first log entry is seen. This means that the previous coordinator could have talked to either 0, or 1, or 2, or all 3 of the masters. So, to ensure that the operation is successfully completed, the new coordinator will talk to all three again and then log done.

Complex failure scenario: Masters not available during recovery (e.g., cold boot):

Consider the following scenario: Coordinator had started a createTable operation and wrote the first entry corresponding to that operation in log. Then the coordinator crashes. Suppose many other masters crash too (cold boot / large scale failures). So during replay, assume that the number of masters available is 0. There may be masters in the cluster that are trying to enlist, but we can't service those requests right now due to the restriction that we have first complete replay then service requests. So how do we complete the operation?

Possible Solution: Undo when can't redo:

In such a scenario, it might be useful to just undo (or, clean abort) the operation that can't be completed. In our example, it would mean asking each master to drop ownership of the corresponding tablet, if it already had the ownership. Undo can help proceed with recovery when certain operations like createTable can't be completed.

Problem: For other operations like dropTable, it is easier to always complete the action rather than undo. Can there be a simpler solution?

Possible Solution: Get master recovery to do your work for you:

We change the above model of logging state to be:

First do all the computations. Then log it. This logging corresponds to the commit point – such that during coordinator recovery, the local state can be modified according to the logged state, and the rest of the operations will always be able to complete - either by direct execution, or handled by master recoveries. For instance, in this example, if M1 was down during coordinator recovery, coordinator can just add the entry to the tabletMap and will, and master recovery of M1 will take care of assigning that tablet ownership to M1.

Concretely, this is how the model for createTable would change:

That works.

Complex failure scenario: One (or more) of the computed masters not available during second half of the operation (either normal op or recovery).

Consider the following scenario: Coordinator had started a createTable operation and wrote the first entry corresponding to that operation in log. After this, some master, say M1 crashes. It may or may not have already started recovering in parallel. What to do when we get to the part where we have to actually add tablet map entries and assign tablets?

Possible solution 1: Recompute mapping.

Recompute T1 mapping, and log entire mapping. (Might be a little tricky implementation-wise.)

Problem: What if, before crash, T1 had already been successfully assigned to M1, but the entire createTable had not completed so we hadn't logged the operation to be done. Since it had been successfully assigned, a (timely) recovery of M1 would've recovered T1 on some other master.

Possible solution 2: Just add entries to tabletMap and recovery will take care of it. (Solution for previous failure scenario.)

Problem: This works only if recovery for M1 will start after we have added this entry to our tabletMap. However, the recovery could've already started or even completed. Adding this entry would basically refer to a server that will never exist or be recovered.

Possible solution 3: Add entries to tabletMap before logging to LogCabin.

Suppose we add entries to tabletMap as we compute them. Now, if M1 crashes, recovery will see that it was supposed to be assigned T1 and T1 will be recovered on some recovery master, say M40 (which means that master recovery will assign T1 tablet ownership to M40, and add entry T1 -> M40 to coordinator tabletMap).

Under this algo, when we get to the second part (which reads the computed mappings and assigns ownerships), we will just skip the entry corresponding to a non-existent master, since it must have already been taken care of.

Implementation note: In second part, for normal entries (masters that exist, and are being assigned ownership) I'd still want to add entries to local tabletMap if this is being called by recovery. Not needed if called by normal operation (but since it is an idempotent operation, we could do it again).

Problem: Suppose coordinator crashes after first part, and then M1 crashes. That means that M1 will not be recovered before the new coordinator starts and completes its recovery. When we get to recovery of this createTable, we have T1 -> M1 and M1 doesn't exist, and if we skip this without doing anything, then T1 will never be assigned to anyone.

Possible solution 4: Hold lock on serverlist for a short while such that we can use solution 2.

Hold lock on serverlist, compute mappings, add mappings to local coordinator state, release lock, log to logcabin. Note about timestamp (headoflog): We can safely put the headoflog as 0 while creating a table. This means that during table creation, i don't need to talk to masters to get the headoflog at all.

Better solution 5: Solution 2 just works with this realization!

During the entire create table operation (or most coordinator write operations for that matter), i have a table manager lock. This means that while i am doing the operation, if a master had crashed, when its recovery wants to read the tablet map from table manager, it will have to stall till this table creation has completed. If the coordinator had failed and is recovering the create table operation, then i anyway can't do master recovery in parallel since i'm first going to complete coordinator recovery before servicing rpcs. So solution 2 just works with this realization (the idea for which came from solution 4).

This works!

4. Multiple Coordinator Nodes:


As mentioned in the background on the top, the coordinator consists of small group of nodes. The following properties should hold:

  • At any given point in time, exactly one of them is the leader, and the others are followers.
  • One of the followers to take over as leader if the current leader fails.
  • Only the leader services the requests to the coordinator, and only the leader can write to log.
  • The followers follow the log in real time, and keep their local state updated, so that in case they have to take over, there is lesser log to replay. Note that this is an optimization, not a requirement.

There are a few main components to implementing this:

  1. Ensuring that I am the leader before appending to log / serving a request.
  2. Establishing, maintaining and broadcasting leadership. If I cannot establish leadership, then i am a follower.
  3. A way for the client to know who the leader of the coordinator is (so that they can talk to the leader).
  4. If i'm the follower, replaying certain type of logs, and starting to replay from the last replayed log location when i become the leader (or from the beginning, if i was not following). #ToDo.


Algorithm ("Safe Appends"):

Ensuring Leadership: If the last entry in the log was appended by me, then I am still the leader. Hence, each append should be a conditional append, conditional on whether the last entry in the log was appended by me.

Maintaining leadership: If i am the leader, I will be continuously logging state changes to the log, hence being the writer of the last entry in the log. However, it is possible that state changes do not happen for time > timeout (after which I might be deemed to be dead by another coordinator instance), and hence I might lose my leadership. Thus, I should append leader_alive entries to the log at timeout intervals.

Establishing leadership: If I am the follower, I will be checking the log at intervals back_off_interval > timeout to check whether there is an active leader right now. If not, I will take over as the new leader. Steps:

  1. Make a note of last entry id.
  2. Wait for timeout.
  3. Come back and check whether the current last entry id in the log is the same as the last entry id checked in step 1.
  4. If it is not the same, then another coordinator instance is an active leader, and this instance backs off for interval back_off_interval > timeout.
  5. If it is the same, then the other leader is probably dead, and this instance takes over as the leader. Append leader_alive entry to the log to establish this leadership.

Some form of a lease mechanism: The above two put together imply some form of a lease mechanism. Each instance of the coordinator checks the log at timeout intervals. It renews its lease ("maintaining leadership") if it was the leader, or gets a lease if it wasn't the leader and there is no active leader ("establishing leadership"), or backs off and tries again if someone else holds the lease right now (i.e., there is another active leader right now).

Providing a "hint" to clients:

When a client wants to talk to the coordinator, it has to contact the leader. We could have some mechanism in DNS to do this.

Alternately, client can find out who the leader is by reading information from logcabin, which we're using as our "ground truth" holder anyway. We could have a special log in logcabin, say "leaderId", that has just one valid entry pointing to the current leader. Once a leader establishes leadership, it can append its id to this log, providing a "hint" to the clients. #note1. If a client tries to contact the leader between these two steps, then it will end up contacting the older leader, who will not respond, and hence the client will retry the first step, by which point the log would probably have been updated.

#note1: Having two separate logs is not ideal, but the only possible solution right now. This is a situation where it would've just been better to have a key value model, since right now, i just want the current value of the "leader" key. There can be other times where the ordering of writes is important, which is when the log structured system is better. Tradeoff between the two.