DCFT Paper Notes

2/17

  • Collin is continuing with Map-Reduce implementation
    • Unclear if rules-based code will provide full benefit
    • Seems like most "faults" arise "synchronously" so far in his case, so he could handle them by propagating them rather than having to poll for failures.
  • (warning)TODO(stutsman): Move Survey of DCFT Systems up to Section 2; start writing up the Replica Manager in the paper draft
    • Concise, but complete description
    • No need to give reasons for details (segments have ordering constraints, don't explain why)
    • Eventually section will want several other short examples
      • Some from in and out of RAMCloud
    • John will do write up of the ServerList
  • Interesting point to try to work in: based on Sparrow feedback
    • Everyone starts without handling faults
    • The hope is that adding fault-handling is a matter of adding try/catch.
    • Turns out it has resulted in complete rework for a us a few times.
  • Another point: want to avoid rules-based programming whenever possible
    • It's less efficient, and it is hard
    • That's one reason why it's great that it is easy to intermix with threaded/imperative code
    • In several places in RAMCloud we contain fault-handling complexity within a subsystem and then the rest of the code that interacts with the system is easy

2/10 Meeting - Conversation about Actors

Similarities to Actors

  • Prescriptive, constrained approach (even more so)
  • Sacrifice some efficiency for ease of reasoning
  • Rules -> Message Handlers (Behaviors)
  • Tasks -> Actors
  • Pools -> Pools (called "Frameworks" in some Actor systems)
  • Both use messages to notify of failures

Differences to Actors

  • Constrained so one message is handled per Actor at a time
  • Shared state is disallowed, except when manipulated explicitly via messages
  • Actors react to messages, not state
    • Could convert messages to state, then fire a message to induce rule recheck
  • Problem: often want to trigger rules when no state updates (messages) are left
    • May be solved by doing everything with actors?
  • Priority inversion issues on messages

Sound Bytes

  • Rules-based code is internally synchronous, but the system is externally asynchronous. Would like a synchronous world, so we convert the async world into something synchronous. But, there are limits, which is why rules have to stay small.
  • Actors: everything is asynchronous all the time; can't even get safe access to shared state.
  • Question that arose: what's really the best case for Actor performance with all of the message passing?
    • Is there an efficient Actor implementation?
    • stutsman: Probably worth looking at the ('06?) Scala actor paper.
  • (warning)TODO(stutsman): Read Needham, Lauer duality paper
  • TODO(stutsman): Read up a bit on Actors; Hewitt papers of 60s, 70s
    • More recent canonical discussion seems to be Agha dissertation
    • Looked through that quite a bit pulled a few interesting bits out of it (see below)

Fragments from Agha Dissertation

  • Several observations are in order here. Firstly, the behavior of an actor can be history sensitive. Secondly, there is no presumed sequentiality in the actions an actor performs since, mathematically, each of its actions is a function of the actor's behavior and the incoming communication.
  •  The shared variables approach does not provide any mechanism for abstraction and information hiding.. For instance, there must be predetermined protocols so tat one process can determine if another has written the results 'it needs into the relevant variables.
  • Buffered asynchronous communication affords us efficiency 'in execution by pipelining the actions to be performed.

1/27 Meeting 

TODO

  • Work up an outline of the paper; send around (stutsman)
  • Gather data
    • Fix TableManager in RAMCloud (stutsman)
    • Redo HDFS
      • Block replication management on NameNode
      • And/Or DataNode to DataNode block replication
    • Does Sparrow have DCFT? Could parts of it be reworked?
    • Need a strong non-RAMCloud example
      • One possibility: a system that already (implicitly) uses rules.
      • Another possibility: show a system that's a mess and show how rules clean it up.

Sound Bytes

  • Think as rules rather than as algorithms
    • What is the difference to thinking in terms of threads or in terms of events?
  • Claim: we don't believe a problem this hard/diverse has a one-size-fits-all solution; hence, we've tried to extract the core concepts. Implementation of the concepts will look different under contexts and requirements. The power is in the fact that the same approach can be applied to all fault-tolerant code regardless of platform, programming language, network/protocols, or even the specific types of faults that occur.
  • Position strongly as a concept/experience paper
    • Probably want the word "Experience" in the title like the monitors paper
    • "Experiences with Rules-driven Programming for Fault-tolerant Systems" etc.
  • Claim: several modules in RAMCloud were attempted in "traditional" threaded style, and they ended up needing to be reworked. Often, there was a tendency to want to write really simple looping, blocking, or locking code somewhere deep in places that looked like it didn't matter (success seemed guaranteed, locks seems short, etc). These were a constant thorn. We should try to list as many as we can think of.
    • BackupSelector was an offender on many occasions. Each time we'd make it a little better, without going the whole way, and each time we'd have to come back and fix it up.
      • One of the constant temptations was to "spin" in the selector waiting for new backups to get added to the list. But this prevented higher-level error handling cases from kicking in.
    • TableManager
    • BackupRecoveryManager
    • ServerList management
    • (add more as we think of them)
  • Recomputing what to do next from scratch is the key to making things easier to reason about; relying in the PC is efficient, but it's burdened with assumptions that make it difficult to use effectively.
    • The rules-based approach unapologetically less efficient.
    • On the other hand, we've used it to great effect in our high-performance system.
  • Rules become really critical when failure-handling may involve failure-handling. Even more so, when failure handling may be recursive, mutually-recursive, or iterative.

Collin's Random Notes

WIP distillation of thoughts on DCFT.

Python MapReduce Notes

Interesting that my implementation also had three levels: Master/Scheduler (Job), TaskWrapper (Task), and Task (TaskAttempt).  I used a "nested rules" approach but in reality it is likely that it could have been collapsed in one top level rules set.  I should mention there is a great deal of similarity between a MapTask and a ReduceTask in terms of it's rules.  How do you modularize Task classes?  Can you subclass a Task class?  The below table does not include the RPC rules.  The table also does not account for the rules for the membership service (though that likely should be a different module).

ModuleRule CountComments
Master/Scheduler1Basically just the rule to prevent deadlock by preempting tasks
MapTaskSet2For straggler reissue.
MapTask/Wapper5 
ReduceTaskSet0Just pool management.
ReduceTask4 
Total12Server failure "event" rule is not included (i.e. the handle that sets the isAlive bit is not counted as a rule). "Event" rules for setting the status of an RPC are also not included. The RPC status is considered a state field.

 

Hadoop MapReduce Walkthrough

MRAppMaster.java

1354: main starts here

1385: MRAppMaster constructed as appMaster

1409: initAndStartAppMaster called on appMaster

1450: initAndStartAppMaster starts here

1479: appMaster init and start, MRAppMaster is CompositeService is AbstractService, init calls serviceInit and start calls serviceStart

252: serviceInit start creates a whole bunch of event handlers that I don't what understand yet.  There are also a bunch of "Dispatchers" who's only job is to call the handle method on the correct handler object.

1019: serviceStart starts calls createJob and startJobs

632: createJob constructs a JobImpl

1251: startJobs gets the ball rolling by generating the first startJobEvent and calls the handler.

JobImpl.java

967: handle method calls doTransition on the stateMachine

299: SetupCompletedTransition()

1560: scheduleTasks

958: Issue new task event

TaskImpl.java

157: InitialScheduleTransition

879: addAndScheduleAttempt

588: Creates new TaskAttempt which is a TaskAttemptImpl and send a TA_SCHEDULE event.

TaskAttemptImpl.java

 

StateMachineFactory.java

This describes how the state machine works internally.  What is important to know is that the addTransition method takes the preState, postState, eventType, and transition.  The transition is an object whose transition method will be called during the doTransition method (called by the event handler).

Hadoop MapReduce State Machine Redundancy

StateMachineTotal TransitionsDistinct Transitions# Duplicate / # Distinct
JobImpl822750/7
TaskImpl24

16

6/3
TaskAttemptImpl571541/8
Total1635897/18

 

JobImpl
CountTransitionTrigger (Event/State)Comment
12DIAGNOSTIC_UPDATE_TRANSITION

JobEventType.JOB_DIAGNOSTIC_UPDATE

Same for most named states (JobImpl)
14COUNTER_UPDATE_TRANSITIONJobEventType.JOB_COUNTER_UPDATE 
13

INTERNAL_ERROR_TRANSITION

JobEventType.INTERNAL_ERROR

InternalErrorTransition extends InternalTerminationTransition by setting the error into the history string.

5INTERNAL_REBOOT_TRANSITIONJobEventType.JOB_AM_REBOOT

InternalRebootTransition extends InternalTerminationTransition by setting the error in the history string.

2

TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION

JobEventType.JOB_TASK_ATTEMPT_COMPLETED

 
2

KilledDuringAbortTransition()

JobEventType.JOB_KILL

From FAIL_WAIT and FAIL_ABORT
2

JobAbortCompletedTransition()

JobEventType.JOB_ABORT_COMPLETEDFrom FAIL_ABORT and KILL_ABORT

82 total transitions including "ignore" transitions.  50 are accounted for by the above 7 transitions (6 if you consider the fact that INTERNAL_ERROR_TRANSITION and INTERNAL_REBOOT_TRANSITION are almost exactly the same).  There are 27 distinct named transitions.

List Candidates Based on events

JobEventType.JOB_KILL

  • KillNewJobTransition

  • KillInitedJobTransition

  • KilledDuringSetupTransition

  • KillTasksTransition

  • KilledDuringCommitTransition

  • KilledDuringAbortTransition

JobAbortCompletedTransition and KilledDuringAbortTransition do roughly the same thing calling unsuccessfulFinish with in turn issues some JobHistoryEvent then calls finished.  Finished is called by KillNewJobTransition and CommitSucceededTransition.

 

Here there are 9 transitions that do small variations on dying.  Although they are all a little bit different I'm not convinced the cleanup is really as complicated as they are making it.

 private static class KilledDuringSetupTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.metrics.endRunningJob(job);
 job.addDiagnostic("Job received kill in SETUP state.");
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
 private static class CommitFailedTransition implements
 SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event;
 job.addDiagnostic("Job commit failed: " + jcfe.getMessage());
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }
 private static class KillInitedJobTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.addDiagnostic("Job received Kill in INITED state.");
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
 private static class KillWaitTaskCompletedTransition extends 
 TaskCompletedTransition {
 @Override
 protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
 if (job.completedTaskCount == job.tasks.size()) {
 job.setFinishTime();
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 return JobStateInternal.KILL_ABORT;
 }
 //return the current state, Job not finished yet
 return job.getInternalState();
 }
 }
 
 private static class KilledDuringCommitTransition implements
 SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.setFinishTime();
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
// Catch of InitTransaction
 } catch (IOException e) {
 LOG.warn("Job init failed", e);
 job.metrics.endPreparingJob(job);
 job.addDiagnostic("Job init failed : "
 + StringUtils.stringifyException(e));
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 return JobStateInternal.FAILED;
 }
 
 private static class SetupFailedTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.metrics.endRunningJob(job);
 job.addDiagnostic("Job setup failed : "
 + ((JobSetupFailedEvent) event).getMessage());
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }
 
 //This transition happens when a job is to be failed. It waits for all the
 //tasks to finish / be killed.
 private static class JobFailWaitTransition
 implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 @Override
 public JobStateInternal transition(JobImpl job, JobEvent event) {
 if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
 for(Task task: job.tasks.values()) {
 if(!task.isFinished()) {
 return JobStateInternal.FAIL_WAIT;
 }
 }
 }
 //Finished waiting. All tasks finished / were killed
 job.failWaitTriggerScheduledFuture.cancel(false);
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 return JobStateInternal.FAIL_ABORT;
 }
 }
 
 //This transition happens when a job to be failed times out while waiting on
 //tasks that had been sent the KILL signal. It is triggered by a
 //ScheduledFuture task queued in the executor.
 private static class JobFailWaitTimedOutTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
 + " Going to fail job anyway");
 job.failWaitTriggerScheduledFuture.cancel(false);
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }

 

TaskImpl

CountTransitionTriggerComment
2

KILL_TRANSITION

TaskEventType.T_KILL 
2

ATTEMPT_KILLED_TRANSITION

TaskEventType.T_ATTEMPT_KILLED 
2

AttemptFailedTransition()

TaskEventType.T_ATTEMPT_FAILED 

24 total transitions include "ignore" transitions.  There are 16 named transitions  6 transitions are accounted for by 3 named transitions.

KillWaitAttemptSucceededTransition and KillWaitAttemptFailedTransition are essentially the same both being trivial subclasses of KillWaitAttemptKilledTransition (3->1). 

The majority of RetroactiveKilledTransition and RetroactiveFailureTransition is cleanup code for with they are ALMOST copy and pasted.  RetroactiveFailureTransition is actually a subclass of AttemptFailedTransition and the super's transition is actually called (3->2).

InitialScheduleTransition and RedundantScheduleTransition look almost the same less some special handling, I can imagine a system where this is not necessary (2->1.5).

TaskAttemptImpl
CountTransitionTriggerComment
2

RequestContainerTransition()

TaskAttemptEventType.TA_SCHEDULE

TaskAttemptEventType.TA_RESCHEDULE

A bool is passed to the constructor to do cleanup in one case and no the other
4

KilledTransition()

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_CONTAINER_CLEANED

TaskAttemptEventType.TA_CLEANUP_DONE

 
2

FailedTransition()

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_CLEANUP_DONE

 
13

DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION

TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE

 
3DeallocateContainerTransition()

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED

Constructor takes a bool that will "send event to speculator that we withdraw our container needs, if we're transitioning out of UNASSIGNED"
13CLEANUP_CONTAINER_TRANSITION

TaskAttemptEventType.TA_CONTAINER_COMPLETED

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_DONE

TaskAttemptEventType.TA_TIMED_OUT

 
2

StatusUpdater()

TaskAttemptEventType.TA_UPDATE

 
2TaskCleanupTransition()TaskAttemptEventType.TA_CONTAINER_CLEANED 

57 transitions included "ignore" transitions.  41 transitions accounted for by 8 named transitions.

Additional Notes:

  • Event "ignore" lists is necessary in this implementation for events that come in that shouldn't affect the state machine.

No support list in Python MapReduce

  • Diagnostics/Logs
  • Counters
  • Doesn't support reboot