Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

This is currently just a dump of ideas / discussions, mainly for keeping track of them. To be shortly formatted well enough to be (hopefully) readable and understandable by others.

Cluster:

The cluster has a centralized coordinator service, and multiple masters and backups (clients for the coordinator). The coordinator consists of small group of nodes. At any given point in time, exactly one of them is the leader, and the others are followers. Only the leader services the requests to the coordinator.

General Coordinator Design:

Thread Safety:

We want the coordinator to be thread safe.

Modularity:

Pull as much logic out of Coordinator as possible, and have better* information hiding and modularity. Egs.:

  • TabletMap:
    • Examples: Atomic operations like addTablet, removeTabletsForTable, etc.
    • Done. Ryan's commit: fb0a623d3957a172314fbe0b53566a5699e1c0e6
  • Will:
    • Similar to TabletMap.
  • ServerList:
    • Examples:
      • Atomic operation to mark server crashed and update serverList version.
      • Any updates to serverList directly broadcasted to cluster without coordinator having to do it separately.

Fault Tolerance:

To make coordinator fault tolerant, the leader should persist information about requests being serviced in a highly reliable, consistent storage service - like LogCabin. If this leader fails at some point, one of the follower takes over as the leader, and can replay the log.

Encapsulating Information:

We encapsulate the information to be persisted in terms of state. The leader pushes the current state of the request being serviced to the log at distinct commit points. The state captures changes to the local state of the coordinator, as well as the communication done / to be done with the external world (masters / backups in the cluster) to change their state.

RPC thread and Replay thread:

The requests coming to the coordinator are serviced by the RPC thread. The actual work is done by a separate thread, called the replay thread. Only the current leader has an active RPC thread. The leader as well as the followers are continuously running the replay thread.

In the general case, when the leader's rpc thread receives a request, it appends the request to the log. The leader's replay thread is following the log, and picks up the request from there and does the required work. Every follower's replay thread can also follow the log in real time to keep their local state up to date, but only after that request has been completed (state indicates "done".) On recovery, the new leader's replay thread replays all log entries that have not yet been replayed, including the ones for incomplete actions. This can be represented by:

Design: RPC and Replay threads
// RPC
if (leader) {
    LogCabinEntry e = new LogCabinEntry(actionNotCompleted = x)
    appendToLogCabin(e);
    wait(conditionVariable);
    replyToRpc(read data to be returned from some common data struct / log); 
}

// REPLAY
while (true) {
    e <- LogCabinIterator.getNextEntry()

    // Entry contains state for a pending request and i am not the leader
    // THEN stop at this point, and retry getting the entry.
    if (!e.done && me != leader) {
        LogCabinIterator.goBack();
        continue;
    }

    // CASE 1: Entry contains state for a pending action and i am
    // a) recovering leader, OR
    // b) general case leader. OR
    // CASE 2: Entry contains state for a completed action and i am
    // a) follower trying to keep my state up-to-date in real time, OR
    // b) recovering leader catching up on state changes i missed,
    // THEN do the action corresponding to this entry.
    retVal = doAction(e);
    signal(conditionVariable);
}

// doAction will dispatch the request to appropriate method.
someDataStruct
doAction(e) {
 switch (e.opcode) {
     case "createTable": 
         return doCreateTable(e);
         break;
     case "createTable":
         return doCreateTable(e);
         break;
     .
     .
     .
  }
} 

 

Note that when a client sends a request to the coordinator, we are not making any guarantees that that request will be completed. So it is perfectly fine to just drop a request in case of failure. However, we don't want to leave the cluster in an inconsistent state, which could happen if the original leader had partially serviced the request. For example, in case of createTable, the leader might have assigned the tablet ownership for 2 of the 5 tablets to masters before it failed. So during recovery, we're only concerned with requests that had already started being serviced. This eliminates the need for rpc thread on the leader to append the state for the incoming request to the log. It can directly call the corresponding method in replay thread, which will then append the state when necessary. This modifies the above design to:

Change in Design: RPC and Replay threads
// RPC
if (leader) {
     retVal = doAction(opcode, rpc, NULL) 
     replyToRpc(retVal);
}

// REPLAY
// Replay thread design is similar to previous design.
// Note that, CASE 1b does NOT exist anymore.
while (true) {
    e <- LogCabinIterator.getNextEntry()

    // Entry contains state for a pending request and i am not the leader
    // THEN stop at this point, and retry getting the entry.
    if (!e.done && me != leader) {
        LogCabinIterator.goBack();
        continue;
    }

    // CASE 1: Entry contains state for a pending action and i am
    // a) recovering leader. OR
    // CASE 2: Entry contains state for a completed action and i am
    // a) follower trying to keep my state up-to-date in real time, OR
    // b) recovering leader catching up on state changes i missed,
    // THEN do the action corresponding to this entry.
    doAction(e.opcode, NULL, e);
} 

// doAction will dispatch the request to appropriate method.
someDataStruct
doAction (opcode, rpc, state) {
     switch (opcode) {
        case "createTable":
            return doCreateTable(rpc, state);
            break;
        case "createTable":
            return doCreateTable(rpc, state);
            break;
        .
        .
        .  
    }
}
Order of execution on recovery:

When a leader fails and a new leader takes over, it will first replay the remaining log in order, and only after that start servicing requests / rpcs.

To understand why this decision was made, consider a simple example in the case where rpcs are serviced during replays:

The coordinator is servicing various requests, from various clients. One of these requests is a createTable request that has been shown explicitly. After completing this request, the coordinator informs the client of it completion. It then fails. On recovery, it replays the log and thus completes all the requests, including the createTable.

Now, while coordinator is replaying its log (note that clients know nothing about the coordinator crash or replay), the client decides to drop the table it had just created. So it sends a dropTable request. If that request is serviced during the replay, there's a chance that it will be serviced before the createTable log for that table is replayed, as shown below. This eventually results in an inconsistent state, wherein the client thinks that the table "foo" doesn't exist, and coordinator (and thus, the masters who own the tablets for "foo") think that it does exist.

Replaying the entire log before servicing new requests removes such inconsistencies.

Logging state:

We log all the states necessary to make sure that if an operation was started, and failure occurred midway, it can be successfully completed during recovery. Note that for performance, we want to log as few states as possible while still getting the correctness. The general idea is to do the computations that can be done, log the state, then do actions that result in external effects (possibly based on the previous computations) – as many times as needed, and finally log that the operation was completed.

For createTable, the following model could work:

Assume the coordinator fails, and on recovery, only the first log entry is seen. This means that the previous coordinator could have talked to either 0, or 1, or 2, or all 3 of the masters. So, to ensure that the operation is successfully completed, the new coordinator will talk to all three again and then log done.

Problem(s) with this model:

During recovery there can be cases when some operations that were originally started (the first entry corresponding to that operation written in log) can't be completed ("done").

Consider the failure scenario described in previous section. At this point, assume that the number of masters available is 0. There may be masters in the cluster that are trying to enlist, but we can't service those requests right now due to the restriction that we have first complete replay then service requests.

Undo when can't redo:

In such a scenario, it might be useful to just undo (or, clean abort) the operation that can't be completed. In our example, it would mean asking each master to drop ownership of the corresponding tablet, if it already had the ownership.

Undo can help proceed with recovery when certain operations like createTable can't be completed. Note that for other operations like dropTable, it is easier to always complete the action rather than undo. Can there be a simpler solution? (Yes, see below.)

Get master recovery to do your work for you: Changing the definition of commit point:

We change the above model of logging state to be:

First do all the computations. Then log it. This logging is the commit point – such that during coordinator recovery, the local state can be modified according to the logged state, and the rest of the operations will always be able to complete - either by direct execution, or handled by master recoveries.

Concretely, this is how the model for createTable would change:

Invalidating previous log entries

Certain operations' log entries can also invalidate previous log entries. For example, a deleteTable request can invalidate (i.e., delete) the createTable entry corresponding to that table. Implementation note: For this example, we can keep around the log entry number in the coordinator local data structure corresponding to that table - possibly "tables". Possible complication (with this type of invalidation): What if there are other operations that operate on the same data, but not as obvious? Eg., createTable followed by a couple split tablet, migrate tablet, etc requests, and then the deleteTable request. Now, can / should the deleteTable log entry invalidate the other log entries as well?

  • No labels