DCFT Paper Notes
2/17
- Collin is continuing with Map-Reduce implementation
- Unclear if rules-based code will provide full benefit
- Seems like most "faults" arise "synchronously" so far in his case, so he could handle them by propagating them rather than having to poll for failures.
- TODO(stutsman): Move Survey of DCFT Systems up to Section 2; start writing up the Replica Manager in the paper draft
- Concise, but complete description
- No need to give reasons for details (segments have ordering constraints, don't explain why)
- Eventually section will want several other short examples
- Some from in and out of RAMCloud
- John will do write up of the ServerList
- Interesting point to try to work in: based on Sparrow feedback
- Everyone starts without handling faults
- The hope is that adding fault-handling is a matter of adding try/catch.
- Turns out it has resulted in complete rework for a us a few times.
- Another point: want to avoid rules-based programming whenever possible
- It's less efficient, and it is hard
- That's one reason why it's great that it is easy to intermix with threaded/imperative code
- In several places in RAMCloud we contain fault-handling complexity within a subsystem and then the rest of the code that interacts with the system is easy
2/10 Meeting - Conversation about Actors
Similarities to Actors
- Prescriptive, constrained approach (even more so)
- Sacrifice some efficiency for ease of reasoning
- Rules -> Message Handlers (Behaviors)
- Tasks -> Actors
- Pools -> Pools (called "Frameworks" in some Actor systems)
- Both use messages to notify of failures
Differences to Actors
- Constrained so one message is handled per Actor at a time
- Shared state is disallowed, except when manipulated explicitly via messages
- Actors react to messages, not state
- Could convert messages to state, then fire a message to induce rule recheck
- Problem: often want to trigger rules when no state updates (messages) are left
- May be solved by doing everything with actors?
- Priority inversion issues on messages
Sound Bytes
- Rules-based code is internally synchronous, but the system is externally asynchronous. Would like a synchronous world, so we convert the async world into something synchronous. But, there are limits, which is why rules have to stay small.
- Actors: everything is asynchronous all the time; can't even get safe access to shared state.
- Question that arose: what's really the best case for Actor performance with all of the message passing?
- Is there an efficient Actor implementation?
- stutsman: Probably worth looking at the ('06?) Scala actor paper.
- TODO(stutsman): Read Needham, Lauer duality paper
- TODO(stutsman): Read up a bit on Actors; Hewitt papers of 60s, 70s
- More recent canonical discussion seems to be Agha dissertation
- Looked through that quite a bit pulled a few interesting bits out of it (see below)
Fragments from Agha Dissertation
- Several observations are in order here. Firstly, the behavior of an actor can be history sensitive. Secondly, there is no presumed sequentiality in the actions an actor performs since, mathematically, each of its actions is a function of the actor's behavior and the incoming communication.
- The shared variables approach does not provide any mechanism for abstraction and information hiding.. For instance, there must be predetermined protocols so tat one process can determine if another has written the results 'it needs into the relevant variables.
- Buffered asynchronous communication affords us efficiency 'in execution by pipelining the actions to be performed.
1/27 Meeting
TODO
- Work up an outline of the paper; send around (stutsman)
- Gather data
- Fix TableManager in RAMCloud (stutsman)
- Redo HDFS
- Block replication management on NameNode
- And/Or DataNode to DataNode block replication
- Does Sparrow have DCFT? Could parts of it be reworked?
- Need a strong non-RAMCloud example
- One possibility: a system that already (implicitly) uses rules.
- Another possibility: show a system that's a mess and show how rules clean it up.
Sound Bytes
- Think as rules rather than as algorithms
- What is the difference to thinking in terms of threads or in terms of events?
- Claim: we don't believe a problem this hard/diverse has a one-size-fits-all solution; hence, we've tried to extract the core concepts. Implementation of the concepts will look different under contexts and requirements. The power is in the fact that the same approach can be applied to all fault-tolerant code regardless of platform, programming language, network/protocols, or even the specific types of faults that occur.
- Position strongly as a concept/experience paper
- Probably want the word "Experience" in the title like the monitors paper
- "Experiences with Rules-driven Programming for Fault-tolerant Systems" etc.
- Claim: several modules in RAMCloud were attempted in "traditional" threaded style, and they ended up needing to be reworked. Often, there was a tendency to want to write really simple looping, blocking, or locking code somewhere deep in places that looked like it didn't matter (success seemed guaranteed, locks seems short, etc). These were a constant thorn. We should try to list as many as we can think of.
- BackupSelector was an offender on many occasions. Each time we'd make it a little better, without going the whole way, and each time we'd have to come back and fix it up.
- One of the constant temptations was to "spin" in the selector waiting for new backups to get added to the list. But this prevented higher-level error handling cases from kicking in.
- TableManager
- BackupRecoveryManager
- ServerList management
- (add more as we think of them)
- BackupSelector was an offender on many occasions. Each time we'd make it a little better, without going the whole way, and each time we'd have to come back and fix it up.
- Recomputing what to do next from scratch is the key to making things easier to reason about; relying in the PC is efficient, but it's burdened with assumptions that make it difficult to use effectively.
- The rules-based approach unapologetically less efficient.
- On the other hand, we've used it to great effect in our high-performance system.
- Rules become really critical when failure-handling may involve failure-handling. Even more so, when failure handling may be recursive, mutually-recursive, or iterative.
Collin's Random Notes
WIP distillation of thoughts on DCFT.
Python MapReduce Notes
Interesting that my implementation also had three levels: Master/Scheduler (Job), TaskWrapper (Task), and Task (TaskAttempt). I used a "nested rules" approach but in reality it is likely that it could have been collapsed in one top level rules set. I should mention there is a great deal of similarity between a MapTask and a ReduceTask in terms of it's rules. How do you modularize Task classes? Can you subclass a Task class? The below table does not include the RPC rules. The table also does not account for the rules for the membership service (though that likely should be a different module).
Module | Rule Count | Comments |
---|---|---|
Master/Scheduler | 1 | Basically just the rule to prevent deadlock by preempting tasks |
MapTaskSet | 2 | For straggler reissue. |
MapTask/Wapper | 5 | |
ReduceTaskSet | 0 | Just pool management. |
ReduceTask | 4 | |
Total | 12 | Server failure "event" rule is not included (i.e. the handle that sets the isAlive bit is not counted as a rule). "Event" rules for setting the status of an RPC are also not included. The RPC status is considered a state field. |
Hadoop MapReduce Walkthrough
MRAppMaster.java
1354: main starts here
1385: MRAppMaster constructed as appMaster
1409: initAndStartAppMaster called on appMaster
1450: initAndStartAppMaster starts here
1479: appMaster init and start, MRAppMaster is CompositeService is AbstractService, init calls serviceInit and start calls serviceStart
252: serviceInit start creates a whole bunch of event handlers that I don't what understand yet. There are also a bunch of "Dispatchers" who's only job is to call the handle method on the correct handler object.
1019: serviceStart starts calls createJob and startJobs
632: createJob constructs a JobImpl
1251: startJobs gets the ball rolling by generating the first startJobEvent and calls the handler.
JobImpl.java
967: handle method calls doTransition on the stateMachine
299: SetupCompletedTransition()
1560: scheduleTasks
958: Issue new task event
TaskImpl.java
157: InitialScheduleTransition
879: addAndScheduleAttempt
588: Creates new TaskAttempt which is a TaskAttemptImpl and send a TA_SCHEDULE event.
TaskAttemptImpl.java
StateMachineFactory.java
This describes how the state machine works internally. What is important to know is that the addTransition method takes the preState, postState, eventType, and transition. The transition is an object whose transition method will be called during the doTransition method (called by the event handler).
Hadoop MapReduce State Machine Redundancy
StateMachine | Total Transitions | Distinct Transitions | # Duplicate / # Distinct |
---|---|---|---|
JobImpl | 82 | 27 | 50/7 |
TaskImpl | 24 | 16 | 6/3 |
TaskAttemptImpl | 57 | 15 | 41/8 |
Total | 163 | 58 | 97/18 |
JobImpl
Count | Transition | Trigger (Event/State) | Comment |
---|---|---|---|
12 | DIAGNOSTIC_UPDATE_TRANSITION | JobEventType.JOB_DIAGNOSTIC_UPDATE | Same for most named states (JobImpl) |
14 | COUNTER_UPDATE_TRANSITION | JobEventType.JOB_COUNTER_UPDATE | |
13 | INTERNAL_ERROR_TRANSITION | JobEventType.INTERNAL_ERROR | InternalErrorTransition extends InternalTerminationTransition by setting the error into the history string. |
5 | INTERNAL_REBOOT_TRANSITION | JobEventType.JOB_AM_REBOOT | InternalRebootTransition extends InternalTerminationTransition by setting the error in the history string. |
2 | TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION | JobEventType.JOB_TASK_ATTEMPT_COMPLETED | |
2 | KilledDuringAbortTransition() | JobEventType.JOB_KILL | From FAIL_WAIT and FAIL_ABORT |
2 | JobAbortCompletedTransition() | JobEventType.JOB_ABORT_COMPLETED | From FAIL_ABORT and KILL_ABORT |
82 total transitions including "ignore" transitions. 50 are accounted for by the above 7 transitions (6 if you consider the fact that INTERNAL_ERROR_TRANSITION and INTERNAL_REBOOT_TRANSITION are almost exactly the same). There are 27 distinct named transitions.
List Candidates Based on events
JobEventType.JOB_KILL
KillNewJobTransition
KillInitedJobTransition
KilledDuringSetupTransition
KillTasksTransition
KilledDuringCommitTransition
KilledDuringAbortTransition
JobAbortCompletedTransition and KilledDuringAbortTransition do roughly the same thing calling unsuccessfulFinish with in turn issues some JobHistoryEvent then calls finished. Finished is called by KillNewJobTransition and CommitSucceededTransition.
Here there are 9 transitions that do small variations on dying. Although they are all a little bit different I'm not convinced the cleanup is really as complicated as they are making it.
private static class KilledDuringSetupTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.metrics.endRunningJob(job); job.addDiagnostic("Job received kill in SETUP state."); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } private static class CommitFailedTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event; job.addDiagnostic("Job commit failed: " + jcfe.getMessage()); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } private static class KillInitedJobTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.addDiagnostic("Job received Kill in INITED state."); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } private static class KillWaitTaskCompletedTransition extends TaskCompletedTransition { @Override protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { if (job.completedTaskCount == job.tasks.size()) { job.setFinishTime(); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); return JobStateInternal.KILL_ABORT; } //return the current state, Job not finished yet return job.getInternalState(); } } private static class KilledDuringCommitTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.setFinishTime(); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } // Catch of InitTransaction } catch (IOException e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); return JobStateInternal.FAILED; } private static class SetupFailedTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.metrics.endRunningJob(job); job.addDiagnostic("Job setup failed : " + ((JobSetupFailedEvent) event).getMessage()); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } //This transition happens when a job is to be failed. It waits for all the //tasks to finish / be killed. private static class JobFailWaitTransition implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { @Override public JobStateInternal transition(JobImpl job, JobEvent event) { if(!job.failWaitTriggerScheduledFuture.isCancelled()) { for(Task task: job.tasks.values()) { if(!task.isFinished()) { return JobStateInternal.FAIL_WAIT; } } } //Finished waiting. All tasks finished / were killed job.failWaitTriggerScheduledFuture.cancel(false); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); return JobStateInternal.FAIL_ABORT; } } //This transition happens when a job to be failed times out while waiting on //tasks that had been sent the KILL signal. It is triggered by a //ScheduledFuture task queued in the executor. private static class JobFailWaitTimedOutTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." + " Going to fail job anyway"); job.failWaitTriggerScheduledFuture.cancel(false); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } }
TaskImpl
Count | Transition | Trigger | Comment |
---|---|---|---|
2 | KILL_TRANSITION | TaskEventType.T_KILL | |
2 | ATTEMPT_KILLED_TRANSITION | TaskEventType.T_ATTEMPT_KILLED | |
2 | AttemptFailedTransition() | TaskEventType.T_ATTEMPT_FAILED |
24 total transitions include "ignore" transitions. There are 16 named transitions 6 transitions are accounted for by 3 named transitions.
KillWaitAttemptSucceededTransition and KillWaitAttemptFailedTransition are essentially the same both being trivial subclasses of KillWaitAttemptKilledTransition (3->1).
The majority of RetroactiveKilledTransition and RetroactiveFailureTransition is cleanup code for with they are ALMOST copy and pasted. RetroactiveFailureTransition is actually a subclass of AttemptFailedTransition and the super's transition is actually called (3->2).
InitialScheduleTransition and RedundantScheduleTransition look almost the same less some special handling, I can imagine a system where this is not necessary (2->1.5).
TaskAttemptImpl
Count | Transition | Trigger | Comment |
---|---|---|---|
2 | RequestContainerTransition() | TaskAttemptEventType.TA_SCHEDULE TaskAttemptEventType.TA_RESCHEDULE | A bool is passed to the constructor to do cleanup in one case and no the other |
4 | KilledTransition() | TaskAttemptEventType.TA_KILL TaskAttemptEventType.TA_CONTAINER_CLEANED TaskAttemptEventType.TA_CLEANUP_DONE | |
2 | FailedTransition() | TaskAttemptEventType.TA_FAILMSG TaskAttemptEventType.TA_CLEANUP_DONE | |
13 | DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION | TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE | |
3 | DeallocateContainerTransition() | TaskAttemptEventType.TA_KILL TaskAttemptEventType.TA_FAILMSG TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED | Constructor takes a bool that will "send event to speculator that we withdraw our container needs, if we're transitioning out of UNASSIGNED" |
13 | CLEANUP_CONTAINER_TRANSITION | TaskAttemptEventType.TA_CONTAINER_COMPLETED TaskAttemptEventType.TA_KILL TaskAttemptEventType.TA_FAILMSG TaskAttemptEventType.TA_DONE TaskAttemptEventType.TA_TIMED_OUT | |
2 | StatusUpdater() | TaskAttemptEventType.TA_UPDATE | |
2 | TaskCleanupTransition() | TaskAttemptEventType.TA_CONTAINER_CLEANED |
57 transitions included "ignore" transitions. 41 transitions accounted for by 8 named transitions.
Additional Notes:
- Event "ignore" lists is necessary in this implementation for events that come in that shouldn't affect the state machine.
No support list in Python MapReduce
- Diagnostics/Logs
- Counters
- Doesn't support reboot