Fragments from Agha Dissertation
WIP distillation of thoughts on DCFT.
Interesting that my implementation also had three levels: Master/Scheduler (Job), TaskWrapper (Task), and Task (TaskAttempt). I used a "nested rules" approach but in reality it is likely that it could have been collapsed in one top level rules set. I should mention there is a great deal of similarity between a MapTask and a ReduceTask in terms of it's rules. How do you modularize Task classes? Can you subclass a Task class? The below table does not include the RPC rules. The table also does not account for the rules for the membership service (though that likely should be a different module).
Module | Rule Count | Comments |
---|---|---|
Master/Scheduler | 1 | Basically just the rule to prevent deadlock by preempting tasks |
MapTaskSet | 2 | For straggler reissue. |
MapTask/Wapper | 5 | |
ReduceTaskSet | 0 | Just pool management. |
ReduceTask | 4 | |
Total | 12 | Server failure "event" rule is not included (i.e. the handle that sets the isAlive bit is not counted as a rule). "Event" rules for setting the status of an RPC are also not included. The RPC status is considered a state field. |
1354: main starts here
1385: MRAppMaster constructed as appMaster
1409: initAndStartAppMaster called on appMaster
1450: initAndStartAppMaster starts here
1479: appMaster init and start, MRAppMaster is CompositeService is AbstractService, init calls serviceInit and start calls serviceStart
252: serviceInit start creates a whole bunch of event handlers that I don't what understand yet. There are also a bunch of "Dispatchers" who's only job is to call the handle method on the correct handler object.
1019: serviceStart starts calls createJob and startJobs
632: createJob constructs a JobImpl
1251: startJobs gets the ball rolling by generating the first startJobEvent and calls the handler.
967: handle method calls doTransition on the stateMachine
299: SetupCompletedTransition()
1560: scheduleTasks
958: Issue new task event
157: InitialScheduleTransition
879: addAndScheduleAttempt
588: Creates new TaskAttempt which is a TaskAttemptImpl and send a TA_SCHEDULE event.
This describes how the state machine works internally. What is important to know is that the addTransition method takes the preState, postState, eventType, and transition. The transition is an object whose transition method will be called during the doTransition method (called by the event handler).
StateMachine | Total Transitions | Distinct Transitions | # Duplicate / # Distinct |
---|---|---|---|
JobImpl | 82 | 27 | 50/7 |
TaskImpl | 24 | 16 | 6/3 |
TaskAttemptImpl | 57 | 15 | 41/8 |
Total | 163 | 58 | 97/18 |
Count | Transition | Trigger (Event/State) | Comment |
---|---|---|---|
12 | DIAGNOSTIC_UPDATE_TRANSITION | JobEventType.JOB_DIAGNOSTIC_UPDATE | Same for most named states (JobImpl) |
14 | COUNTER_UPDATE_TRANSITION | JobEventType.JOB_COUNTER_UPDATE | |
13 | INTERNAL_ERROR_TRANSITION | JobEventType.INTERNAL_ERROR | InternalErrorTransition extends InternalTerminationTransition by setting the error into the history string. |
5 | INTERNAL_REBOOT_TRANSITION | JobEventType.JOB_AM_REBOOT | InternalRebootTransition extends InternalTerminationTransition by setting the error in the history string. |
2 | TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION | JobEventType.JOB_TASK_ATTEMPT_COMPLETED | |
2 | KilledDuringAbortTransition() | JobEventType.JOB_KILL | From FAIL_WAIT and FAIL_ABORT |
2 | JobAbortCompletedTransition() | JobEventType.JOB_ABORT_COMPLETED | From FAIL_ABORT and KILL_ABORT |
82 total transitions including "ignore" transitions. 50 are accounted for by the above 7 transitions (6 if you consider the fact that INTERNAL_ERROR_TRANSITION and INTERNAL_REBOOT_TRANSITION are almost exactly the same). There are 27 distinct named transitions.
JobEventType.JOB_KILL
KillNewJobTransition
KillInitedJobTransition
KilledDuringSetupTransition
KillTasksTransition
KilledDuringCommitTransition
KilledDuringAbortTransition
JobAbortCompletedTransition and KilledDuringAbortTransition do roughly the same thing calling unsuccessfulFinish with in turn issues some JobHistoryEvent then calls finished. Finished is called by KillNewJobTransition and CommitSucceededTransition.
Here there are 9 transitions that do small variations on dying. Although they are all a little bit different I'm not convinced the cleanup is really as complicated as they are making it.
private static class KilledDuringSetupTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.metrics.endRunningJob(job); job.addDiagnostic("Job received kill in SETUP state."); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } private static class CommitFailedTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event; job.addDiagnostic("Job commit failed: " + jcfe.getMessage()); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } private static class KillInitedJobTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.addDiagnostic("Job received Kill in INITED state."); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } private static class KillWaitTaskCompletedTransition extends TaskCompletedTransition { @Override protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { if (job.completedTaskCount == job.tasks.size()) { job.setFinishTime(); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); return JobStateInternal.KILL_ABORT; } //return the current state, Job not finished yet return job.getInternalState(); } } private static class KilledDuringCommitTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.setFinishTime(); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } // Catch of InitTransaction } catch (IOException e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); return JobStateInternal.FAILED; } private static class SetupFailedTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.metrics.endRunningJob(job); job.addDiagnostic("Job setup failed : " + ((JobSetupFailedEvent) event).getMessage()); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } //This transition happens when a job is to be failed. It waits for all the //tasks to finish / be killed. private static class JobFailWaitTransition implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { @Override public JobStateInternal transition(JobImpl job, JobEvent event) { if(!job.failWaitTriggerScheduledFuture.isCancelled()) { for(Task task: job.tasks.values()) { if(!task.isFinished()) { return JobStateInternal.FAIL_WAIT; } } } //Finished waiting. All tasks finished / were killed job.failWaitTriggerScheduledFuture.cancel(false); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); return JobStateInternal.FAIL_ABORT; } } //This transition happens when a job to be failed times out while waiting on //tasks that had been sent the KILL signal. It is triggered by a //ScheduledFuture task queued in the executor. private static class JobFailWaitTimedOutTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." + " Going to fail job anyway"); job.failWaitTriggerScheduledFuture.cancel(false); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } |
TaskImpl
Count | Transition | Trigger | Comment |
---|---|---|---|
2 | KILL_TRANSITION | TaskEventType.T_KILL | |
2 | ATTEMPT_KILLED_TRANSITION | TaskEventType.T_ATTEMPT_KILLED | |
2 | AttemptFailedTransition() | TaskEventType.T_ATTEMPT_FAILED |
24 total transitions include "ignore" transitions. There are 16 named transitions 6 transitions are accounted for by 3 named transitions.
KillWaitAttemptSucceededTransition and KillWaitAttemptFailedTransition are essentially the same both being trivial subclasses of KillWaitAttemptKilledTransition (3->1).
The majority of RetroactiveKilledTransition and RetroactiveFailureTransition is cleanup code for with they are ALMOST copy and pasted. RetroactiveFailureTransition is actually a subclass of AttemptFailedTransition and the super's transition is actually called (3->2).
InitialScheduleTransition and RedundantScheduleTransition look almost the same less some special handling, I can imagine a system where this is not necessary (2->1.5).
Count | Transition | Trigger | Comment |
---|---|---|---|
2 | RequestContainerTransition() | TaskAttemptEventType.TA_SCHEDULE TaskAttemptEventType.TA_RESCHEDULE | A bool is passed to the constructor to do cleanup in one case and no the other |
4 | KilledTransition() | TaskAttemptEventType.TA_KILL TaskAttemptEventType.TA_CONTAINER_CLEANED TaskAttemptEventType.TA_CLEANUP_DONE | |
2 | FailedTransition() | TaskAttemptEventType.TA_FAILMSG TaskAttemptEventType.TA_CLEANUP_DONE | |
13 | DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION | TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE | |
3 | DeallocateContainerTransition() | TaskAttemptEventType.TA_KILL TaskAttemptEventType.TA_FAILMSG TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED | Constructor takes a bool that will "send event to speculator that we withdraw our container needs, if we're transitioning out of UNASSIGNED" |
13 | CLEANUP_CONTAINER_TRANSITION | TaskAttemptEventType.TA_CONTAINER_COMPLETED TaskAttemptEventType.TA_KILL TaskAttemptEventType.TA_FAILMSG TaskAttemptEventType.TA_DONE TaskAttemptEventType.TA_TIMED_OUT | |
2 | StatusUpdater() | TaskAttemptEventType.TA_UPDATE | |
2 | TaskCleanupTransition() | TaskAttemptEventType.TA_CONTAINER_CLEANED |
57 transitions included "ignore" transitions. 41 transitions accounted for by 8 named transitions.