Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

This is currently just a dump of ideas / discussions. To be shortly formatted well enough to be (hopefully) readable and understandable by others.

Cluster:

The cluster has a centralized coordinator service, and multiple masters and backups (clients for the coordinator). The coordinator consists of small group of nodes. At any given point in time, exactly one of them is the leader, and the others are followers. Only the leader services the requests to the coordinator.

General Coordinator Design:

Thread Safety:

We want the coordinator to be thread safe.

Modularity:

Pull as much logic out of Coordinator as possible, and have better* information hiding and modularity. Egs.:

  • TabletMap:
    • Examples: Atomic operations like addTablet, removeTabletsForTable, etc.
    • Done. Ryan's commit: fb0a623d3957a172314fbe0b53566a5699e1c0e6
  • Will:
    • Similar to TabletMap.
  • ServerList:
    • Examples:
      • Atomic operation to mark server crashed and update serverList version.
      • Any updates to serverList directly broadcasted to cluster without coordinator having to do it separately.

Fault Tolerance:

To make coordinator fault tolerant, the leader should persist information about requests being serviced in a highly reliable, consistent storage service - like LogCabin. If this leader fails at some point, one of the follower takes over as the leader, and can replay the log.

Encapsulating Information:

We encapsulate the information to be persisted in terms of state. The leader pushes the current state of the request being serviced to the log at distinct commit points. The state captures changes to the local state of the coordinator, as well as the communication done / to be done with the external world (masters / backups in the cluster) to change their state.

RPC thread and Replay thread:

The requests coming to the coordinator are serviced by the RPC thread. The actual work is done by a separate thread, called the replay thread. Only the current leader has an active RPC thread. The leader as well as the followers are continuously running the replay thread.

In the general case, when the leader's rpc thread receives a request, it appends the request to the log. The leader's replay thread is following the log, and picks up the request from there and does the required work. Every follower's replay thread can also follow the log in real time to keep their local state up to date, but only after that request has been completed (state indicates "done".) On recovery, the new leader's replay thread replays all log entries that have not yet been replayed, including the ones for incomplete actions. This can be represented by:

Design: RPC and Replay threads
// RPC
if (leader) {
    LogCabinEntry e = new LogCabinEntry(actionNotCompleted = x)
    appendToLogCabin(e);
    wait(conditionVariable);
    replyToRpc(read data to be returned from some common data struct / log); 
}

// REPLAY
while (true) {
    e <- LogCabinIterator.getNextEntry()

    // Entry contains state for a pending request and i am not the leader
    // THEN stop at this point, and retry getting the entry.
    if (!e.done && me != leader) {
        LogCabinIterator.goBack();
        continue;
    }

    // CASE 1: Entry contains state for a pending action and i am
    // a) recovering leader, OR
    // b) general case leader. OR
    // CASE 2: Entry contains state for a completed action and i am
    // a) follower trying to keep my state up-to-date in real time, OR
    // b) recovering leader catching up on state changes i missed,
    // THEN do the action corresponding to this entry.
    retVal = doAction(e);
    signal(conditionVariable);
}

// doAction will dispatch the request to appropriate method.
someDataStruct
doAction(e) {
 switch (e.opcode) {
     case "createTable": 
         return doCreateTable(e);
         break;
     case "createTable":
         return doCreateTable(e);
         break;
     .
     .
     .
  }
} 

 

Note that when a client sends a request to the coordinator, we are not making any guarantees that that request will be completed. So it is perfectly fine to just drop a request in case of failure. However, we don't want to leave the cluster in an inconsistent state, which could happen if the original leader had partially serviced the request. For example, in case of createTable, the leader might have assigned the tablet ownership for 2 of the 5 tablets to masters before it failed. So during recovery, we're only concerned with requests that had already started being serviced. This eliminates the need for rpc thread on the leader to append the state for the incoming request to the log. It can directly call the corresponding method in replay thread, which will then append the state when necessary. This modifies the above design to:

Change in Design: RPC and Replay threads
// RPC
if (leader) {
     retVal = doAction(opcode, rpc, NULL) 
     replyToRpc(retVal);
}

// REPLAY
// Replay thread design is similar to previous design.
// Note that, CASE 1b does NOT exist anymore.
while (true) {
    e <- LogCabinIterator.getNextEntry()

    // Entry contains state for a pending request and i am not the leader
    // THEN stop at this point, and retry getting the entry.
    if (!e.done && me != leader) {
        LogCabinIterator.goBack();
        continue;
    }

    // CASE 1: Entry contains state for a pending action and i am
    // a) recovering leader. OR
    // CASE 2: Entry contains state for a completed action and i am
    // a) follower trying to keep my state up-to-date in real time, OR
    // b) recovering leader catching up on state changes i missed,
    // THEN do the action corresponding to this entry.
    doAction(e.opcode, NULL, e);
} 

// doAction will dispatch the request to appropriate method.
someDataStruct
doAction (opcode, rpc, state) {
     switch (opcode) {
        case "createTable":
            return doCreateTable(rpc, state);
            break;
        case "createTable":
            return doCreateTable(rpc, state);
            break;
        .
        .
        .  
    }
}
Order of execution on recovery:

When a leader fails and a new leader takes over, it will first replay the remaining log in order, and only after that start servicing requests / rpcs.

To understand why this decision was made, consider a simple example in the case where rpcs are serviced during replays:

The coordinator is servicing various requests, from various clients. One of these requests is a createTable request that has been shown explicitly. After completing this request, the coordinator informs the client of it completion. It then fails. On recovery, it replays the log and thus completes all the requests, including the createTable.

Now, while coordinator is replaying its log (note that clients know nothing about the coordinator crash or replay), the client decides to drop the table it had just created. So it sends a dropTable request. If that request is serviced during the replay, there's a chance that it will be serviced before the createTable log for that table is replayed, as shown below. This eventually results in an inconsistent state, wherein the client thinks that the table "foo" doesn't exist, and coordinator (and thus, the masters who own the tablets for "foo") think that it does exist.

Replaying the entire log before servicing new requests removes such inconsistencies.

Problem(s) with this model:

 

Get recovery to do your work for you: Changing the definition of "done":

 

 

 


* For some definition of this word.

  • No labels