Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

WIP distillation of thoughts on DCFT.

Python MapReduce Notes

Interesting that my implementation also had three levels: Master/Scheduler (Job), TaskWrapper (Task), and Task (TaskAttempt).  I used a "nested rules" approach but in reality it is likely that it could have been collapsed in one top level rules set.  I should mention there is a great deal of similarity between a MapTask and a ReduceTask in terms of it's rules.  How do you modularize Task classes?  Can you subclass a Task class?  The below table does not include the RPC rules.  The table also does not account for the rules for the membership service (though that likely should be a different module).

ModuleRule CountComments
Master/Scheduler1Basically just the rule to prevent deadlock by preempting tasks
MapTaskSet2For straggler reissue.
MapTask/Wapper5 
ReduceTaskSet0Just pool management.
ReduceTask4 
Total12Server failure "event" rule is not included (i.e. the handle that sets the isAlive bit is not counted as a rule). "Event" rules for setting the status of an RPC are also not included. The RPC status is considered a state field.

 

Hadoop MapReduce Walkthrough

...

Hadoop MapReduce State Machine Redundancy

StateMachineTotal TransitionsDistinct Transitions# Duplicate / # Distinct
JobImpl822750/7
TaskImpl24

16

6/3
TaskAttemptImpl571541/8
Total1635897/18

 

JobImpl
CountTransitionTrigger (Event/State)Comment
12DIAGNOSTIC_UPDATE_TRANSITION

JobEventType.JOB_DIAGNOSTIC_UPDATE

Same for most named states (JobImpl)
14COUNTER_UPDATE_TRANSITIONJobEventType.JOB_COUNTER_UPDATE 
13

INTERNAL_ERROR_TRANSITION

JobEventType.INTERNAL_ERROR

InternalErrorTransition extends InternalTerminationTransition by setting the error into the history string.

5INTERNAL_REBOOT_TRANSITIONJobEventType.JOB_AM_REBOOT

InternalRebootTransition extends InternalTerminationTransition by setting the error in the history string.

2

TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION

JobEventType.JOB_TASK_ATTEMPT_COMPLETED

 
2

KilledDuringAbortTransition()

JobEventType.JOB_KILL

From FAIL_WAIT and FAIL_ABORT
2

JobAbortCompletedTransition()

JobEventType.JOB_ABORT_COMPLETEDFrom FAIL_ABORT and KILL_ABORT

82 total transitions including "ignore" transitions.  50 are accounted for by the above 7 transitions (6 if you consider the fact that INTERNAL_ERROR_TRANSITION and INTERNAL_REBOOT_TRANSITION are almost exactly the same).  There are 27 distinct named transitions.

List Candidates Based on events

JobEventType.JOB_KILL

  • KillNewJobTransition

  • KillInitedJobTransition

  • KilledDuringSetupTransition

  • KillTasksTransition

  • KilledDuringCommitTransition

  • KilledDuringAbortTransition

JobAbortCompletedTransition and KilledDuringAbortTransition do roughly the same thing calling unsuccessfulFinish with in turn issues some JobHistoryEvent then calls finished.  Finished is called by KillNewJobTransition and CommitSucceededTransition.

 

Here there are 9 transitions that do small variations on dying.  Although they are all a little bit different I'm not convinced the cleanup is really as complicated as they are making it.

No Format
 private static class KilledDuringSetupTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.metrics.endRunningJob(job);
 job.addDiagnostic("Job received kill in SETUP state.");
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
 private static class CommitFailedTransition implements
 SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event;
 job.addDiagnostic("Job commit failed: " + jcfe.getMessage());
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }
 private static class KillInitedJobTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.addDiagnostic("Job received Kill in INITED state.");
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
 private static class KillWaitTaskCompletedTransition extends 
 TaskCompletedTransition {
 @Override
 protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
 if (job.completedTaskCount == job.tasks.size()) {
 job.setFinishTime();
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 return JobStateInternal.KILL_ABORT;
 }
 //return the current state, Job not finished yet
 return job.getInternalState();
 }
 }
 
 private static class KilledDuringCommitTransition implements
 SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.setFinishTime();
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
// Catch of InitTransaction
 } catch (IOException e) {
 LOG.warn("Job init failed", e);
 job.metrics.endPreparingJob(job);
 job.addDiagnostic("Job init failed : "
 + StringUtils.stringifyException(e));
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 return JobStateInternal.FAILED;
 }
 
 private static class SetupFailedTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.metrics.endRunningJob(job);
 job.addDiagnostic("Job setup failed : "
 + ((JobSetupFailedEvent) event).getMessage());
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }
 
 //This transition happens when a job is to be failed. It waits for all the
 //tasks to finish / be killed.
 private static class JobFailWaitTransition
 implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 @Override
 public JobStateInternal transition(JobImpl job, JobEvent event) {
 if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
 for(Task task: job.tasks.values()) {
 if(!task.isFinished()) {
 return JobStateInternal.FAIL_WAIT;
 }
 }
 }
 //Finished waiting. All tasks finished / were killed
 job.failWaitTriggerScheduledFuture.cancel(false);
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 return JobStateInternal.FAIL_ABORT;
 }
 }
 
 //This transition happens when a job to be failed times out while waiting on
 //tasks that had been sent the KILL signal. It is triggered by a
 //ScheduledFuture task queued in the executor.
 private static class JobFailWaitTimedOutTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
 + " Going to fail job anyway");
 job.failWaitTriggerScheduledFuture.cancel(false);
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }

 

TaskImpl

CountTransitionTriggerComment
2

KILL_TRANSITION

TaskEventType.T_KILL 
2

ATTEMPT_KILLED_TRANSITION

TaskEventType.T_ATTEMPT_KILLED 
2

AttemptFailedTransition()

TaskEventType.T_ATTEMPT_FAILED 

24 total transitions include "ignore" transitions.  There are 16 named transitions  6 transitions are accounted for by 3 named transitions.

KillWaitAttemptSucceededTransition and KillWaitAttemptFailedTransition are essentially the same both being trivial subclasses of KillWaitAttemptKilledTransition (3->1). 

The majority of RetroactiveKilledTransition and RetroactiveFailureTransition is cleanup code for with they are ALMOST copy and pasted.  RetroactiveFailureTransition is actually a subclass of AttemptFailedTransition and the super's transition is actually called (3->2).

InitialScheduleTransition and RedundantScheduleTransition look almost the same less some special handling, I can imagine a system where this is not necessary (2->1.5).

TaskAttemptImpl
CountTransitionTriggerComment
2

RequestContainerTransition()

TaskAttemptEventType.TA_SCHEDULE

TaskAttemptEventType.TA_RESCHEDULE

A bool is passed to the constructor to do cleanup in one case and no the other
4

KilledTransition()

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_CONTAINER_CLEANED

TaskAttemptEventType.TA_CLEANUP_DONE

 
2

FailedTransition()

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_CLEANUP_DONE

 
13

DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION

TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE

 
3DeallocateContainerTransition()

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED

Constructor takes a bool that will "send event to speculator that we withdraw our container needs, if we're transitioning out of UNASSIGNED"
13CLEANUP_CONTAINER_TRANSITION

TaskAttemptEventType.TA_CONTAINER_COMPLETED

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_DONE

TaskAttemptEventType.TA_TIMED_OUT

 
2

StatusUpdater()

TaskAttemptEventType.TA_UPDATE

 
2TaskCleanupTransition()TaskAttemptEventType.TA_CONTAINER_CLEANED 

57 transitions included "ignore" transitions.  41 transitions accounted for by 8 named transitions.

Additional Notes:

  • Event "ignore" lists is necessary in this implementation for events that come in that shouldn't affect the state machine.

...