Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 37 Next »

This is a dump of discussions on both high level and implementation issues, mainly to help me keep track of them.

Background:

The cluster has a centralized coordinator service, and multiple masters and backups (clients for the coordinator). The coordinator manages the cluster membership, tablet configuration, and stores core metadata. The coordinator consists of small group of nodes. At any given point in time, exactly one of them is the leader, and the others are followers. Only the leader services the requests to the coordinator.

Coordinator Design Goals:

1. Thread Safety:

We want the coordinator to be thread safe.

2. Modularity:

Pull as much logic out of Coordinator as possible. Egs.:

  • TabletMap:
    • Examples: Atomic operations like addTablet, removeTabletsForTable, etc.
    • Done. Ryan's commit: fb0a623d3957a172314fbe0b53566a5699e1c0e6
  • Will:
    • Similar to TabletMap.
  • ServerList:
    • Examples:
      • Atomic operation to mark server crashed and update serverList version.
      • Any updates to serverList directly broadcasted to cluster without coordinator having to do it separately.

CoordinatorService also needs refactoring: Details.

3. Atomic Distributed State Change:

Introduction:

However, machines in the cluster can fail. This makes the distributed state change non-atomic. Non atomic state changes could result in inconsistent states.

Note: We're going to use createTable as a running example of the distributed state change operation.

A master can fail at various points in time:

 

Thus, by design, we have pushed out the job of taking care of inconsistencies arising due to master failures to master recovery.

The coordinator can also fail at various points in time:

 

Thus, the coordinator failure at some times can result in inconsistent state. (We can also rephrase this goal as fault tolerance.)

Commit Point:

For every coordinator operation, we can define a commit point, such that:

  1. If the coordinator failure occurs before commit point, the operation will be cleanly aborted (cleanly = no side effects / inconsistencies).
  2. If the coordinator failure occurs after the commit point, the operation is guaranteed to be completed.

Persisting information:

To achieve the above guarantee, we need to leave enough information around so that:

  1. The new leader is in the same state as the old leader
  2. The new leader can roll forward the operations to completion if needed.

This information has to be persisted across failures.

The leader should thus store information about requests being serviced in a highly reliable, consistent storage service - like LogCabin. LogCabin provides abstractions to append to and read from a highly reliable distributed log.

Encapsulating Information:

We encapsulate the information to be persisted in terms of state. The leader pushes the current state of the request being serviced to the log at distinct points. The new leader can infer the actions to be done from this state. That is, the state captures changes to the local state of the coordinator, as well as the communication done / to be done with the external world (masters / backups in the cluster) to change their state.

State Implementation:

State that is logged in LogCabin can be implemented as either a C++ struct or protobuf.

  • C++ structs would provide better performance; Protobuf implementation leads to extra overheads for serialization before appending to log and deserialization after extraction.
  • It is easier to express complex components in a protobuf (eg., a string with multiple pointers to various strings).

Since the latter could be important, and performance during writes is not a major concern for the coordinator (since we expect few writes), we choose protobuf implementation.

Logging states:

We log all the states necessary to make sure that if an operation was started, and failure occurred midway, it can be successfully completed during recovery. Note that for performance, we want to log as few states as possible while still getting the correctness. The general idea is to do the computations that can be done, log the state, then do actions that result in external effects (possibly based on the previous computations) – as many times as needed, and finally log that the operation was completed.

RPC thread and Replay thread:

The requests coming to the coordinator are serviced by the RPC thread in the current leader. The main motivation behind the replay thread is for the recovering coordinator (new leader) to "replay" the operations (if needed), and for the followers to keep track of what the leader is doing.

When the leader's RPC thread receives a request, it appends the request to the log. The leader's replay thread picks up the request from the log and does the required work. Every follower's replay thread can also follow the log in real time to keep their local state up to date, but only after that request has been completed (state indicates "done".) On recovery, the new leader's replay thread replays all log entries that have not yet been replayed, including the ones for incomplete actions. This can be represented by:

Design: RPC and Replay threads
// RPC
if (leader) {
    LogCabinEntry e = new LogCabinEntry(data about this operation)
    appendToLogCabin(e);
    wait(conditionVariable);
    replyToRpc(read data to be returned from some common data struct or log); 
}

// REPLAY
while (true) {
    e <- LogCabinIterator.getNextEntry();

    // Entry contains state for a pending request and i am not the leader
    // THEN stop at this point, and retry getting the entry.
    if (!e.done && me != leader) {
        LogCabinIterator.goBack();
        continue;
    }

    // CASE 1: Entry contains state for a pending action and i am
    // a) recovering leader, OR
    // b) general case leader. OR
    // CASE 2: Entry contains state for a completed action and i am
    // a) follower trying to keep my state up-to-date in real time, OR
    // b) recovering leader catching up on state changes i missed,
    // THEN do the action corresponding to this entry.
    retVal = doAction(e);    // doAction is the dispatcher.
    signal(conditionVariable);
}

Note that when a client sends a request to the coordinator, we are not making any guarantees that that request will be completed. So it is perfectly fine to just drop a request in case of failure. However, we don't want to leave the cluster in an inconsistent state, which could happen if the original leader had partially serviced the request. This eliminates the need for rpc thread on the leader to append the state for the incoming request to the log. It can directly call the corresponding method in replay thread, which will then append the state when necessary. This modifies the above design to:

Change in Design: RPC and Replay threads
// RPC
if (leader) {
     retVal = doAction(opcode, rpc, NULL);    // doAction is the dispatcher.
     replyToRpc(retVal);
}

// REPLAY
while (true) {
    e <- LogCabinIterator.getNextEntry();

    // Same as previous design
    if (!e.done && me != leader) {
        LogCabinIterator.goBack();
        continue;
    }

    // CASE 1a, 2a, 2b from previous design
    doAction(e.opcode, NULL, e);    // doAction is the dispatcher.
}

This describes, at a high level, the possible cases and the work to be done in each of the cases. A detailed discussion of the exact division of work between rpc thread and replay thread will be done in Coordinator Refactoring.

Order of execution on recovery:

When a leader fails and a new leader takes over, it will first replay the remaining log in order, and only after that start servicing requests / rpcs.

To understand why this decision was made, consider a simple example in the case where rpcs are serviced during replays:

The coordinator is servicing various requests, from various clients. One of these requests is a createTable request that has been shown explicitly. After completing this request, the coordinator informs the client of it completion. It then fails. On recovery, it replays the log and thus completes all the requests, including the createTable.

Now, while coordinator is replaying its log (note that clients know nothing about the coordinator crash or replay), the client decides to drop the table it had just created. So it sends a dropTable request. If that request is serviced during the replay, there's a chance that it will be serviced before the createTable log for that table is replayed, as shown below. This eventually results in an inconsistent state, wherein the client thinks that the table "foo" doesn't exist, and coordinator (and thus, the masters who own the tablets for "foo") think that it does exist.

Replaying the entire log before servicing new requests removes such inconsistencies.

Invalidating previous log entries:

Background note: The non-first time a log entry corresponding to a particular operation is written, it invalidates the previous log entry written for that operation. This can be implemented as: store the latest log entry number for an update to that table, in the coordinator's local data structure corresponding to that table - possibly "tables". The next time a log entry doing an operation on that table is written, it reads the previous log entry number from that data structure ("tables") and invalidates that log entry while writing the new one.

Now,

An operation's log entry should also invalidate other operations' log entries if those operations are rendered effect-less after this operation. This will clean up the lint from the log. For example, a deleteTable request should invalidate the createTable entry corresponding to that table. It should also invalidate any other operations that might have happened on the table or the tablets contained in it (like splitTablet).

Implementation of createTable (detailed discussion):

This is the detailed version of createTable algo:


First try:

The following logging model could work:

Assume the coordinator fails, and on recovery, only the first log entry is seen. This means that the previous coordinator could have talked to either 0, or 1, or 2, or all 3 of the masters. So, to ensure that the operation is successfully completed, the new coordinator will talk to all three again and then log done.

Problem(s) with this model:

During recovery there can be cases when some operations that were originally started (the first entry corresponding to that operation written in log) can't be completed ("done").

Consider the failure scenario described in previous section. At this point, assume that the number of masters available is 0. There may be masters in the cluster that are trying to enlist, but we can't service those requests right now due to the restriction that we have first complete replay then service requests.

Undo when can't redo:

In such a scenario, it might be useful to just undo (or, clean abort) the operation that can't be completed. In our example, it would mean asking each master to drop ownership of the corresponding tablet, if it already had the ownership.

Undo can help proceed with recovery when certain operations like createTable can't be completed. Note that for other operations like dropTable, it is easier to always complete the action rather than undo. Can there be a simpler solution? (Yes, see below.)

Get master recovery to do your work for you: Changing the definition of commit point:

We change the above model of logging state to be:

First do all the computations. Then log it. This logging corresponds to the commit point – such that during coordinator recovery, the local state can be modified according to the logged state, and the rest of the operations will always be able to complete - either by direct execution, or handled by master recoveries. For instance, in this example, if M1 was down during coordinator recovery, coordinator can just add the entry to the tabletMap and will, and master recovery of M1 will take care of assigning that tablet ownership to M1.

Concretely, this is how the model for createTable would change:

  • No labels