2/17

2/10 Meeting - Conversation about Actors

Similarities to Actors

Differences to Actors

Sound Bytes

Fragments from Agha Dissertation

1/27 Meeting 

TODO

Sound Bytes

Collin's Random Notes

WIP distillation of thoughts on DCFT.

Python MapReduce Notes

Interesting that my implementation also had three levels: Master/Scheduler (Job), TaskWrapper (Task), and Task (TaskAttempt).  I used a "nested rules" approach but in reality it is likely that it could have been collapsed in one top level rules set.  I should mention there is a great deal of similarity between a MapTask and a ReduceTask in terms of it's rules.  How do you modularize Task classes?  Can you subclass a Task class?  The below table does not include the RPC rules.  The table also does not account for the rules for the membership service (though that likely should be a different module).

ModuleRule CountComments
Master/Scheduler1Basically just the rule to prevent deadlock by preempting tasks
MapTaskSet2For straggler reissue.
MapTask/Wapper5 
ReduceTaskSet0Just pool management.
ReduceTask4 
Total12Server failure "event" rule is not included (i.e. the handle that sets the isAlive bit is not counted as a rule). "Event" rules for setting the status of an RPC are also not included. The RPC status is considered a state field.

 

Hadoop MapReduce Walkthrough

MRAppMaster.java

1354: main starts here

1385: MRAppMaster constructed as appMaster

1409: initAndStartAppMaster called on appMaster

1450: initAndStartAppMaster starts here

1479: appMaster init and start, MRAppMaster is CompositeService is AbstractService, init calls serviceInit and start calls serviceStart

252: serviceInit start creates a whole bunch of event handlers that I don't what understand yet.  There are also a bunch of "Dispatchers" who's only job is to call the handle method on the correct handler object.

1019: serviceStart starts calls createJob and startJobs

632: createJob constructs a JobImpl

1251: startJobs gets the ball rolling by generating the first startJobEvent and calls the handler.

JobImpl.java

967: handle method calls doTransition on the stateMachine

299: SetupCompletedTransition()

1560: scheduleTasks

958: Issue new task event

TaskImpl.java

157: InitialScheduleTransition

879: addAndScheduleAttempt

588: Creates new TaskAttempt which is a TaskAttemptImpl and send a TA_SCHEDULE event.

TaskAttemptImpl.java

 

StateMachineFactory.java

This describes how the state machine works internally.  What is important to know is that the addTransition method takes the preState, postState, eventType, and transition.  The transition is an object whose transition method will be called during the doTransition method (called by the event handler).

Hadoop MapReduce State Machine Redundancy

StateMachineTotal TransitionsDistinct Transitions# Duplicate / # Distinct
JobImpl822750/7
TaskImpl24

16

6/3
TaskAttemptImpl571541/8
Total1635897/18

 

JobImpl
CountTransitionTrigger (Event/State)Comment
12DIAGNOSTIC_UPDATE_TRANSITION

JobEventType.JOB_DIAGNOSTIC_UPDATE

Same for most named states (JobImpl)
14COUNTER_UPDATE_TRANSITIONJobEventType.JOB_COUNTER_UPDATE 
13

INTERNAL_ERROR_TRANSITION

JobEventType.INTERNAL_ERROR

InternalErrorTransition extends InternalTerminationTransition by setting the error into the history string.

5INTERNAL_REBOOT_TRANSITIONJobEventType.JOB_AM_REBOOT

InternalRebootTransition extends InternalTerminationTransition by setting the error in the history string.

2

TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION

JobEventType.JOB_TASK_ATTEMPT_COMPLETED

 
2

KilledDuringAbortTransition()

JobEventType.JOB_KILL

From FAIL_WAIT and FAIL_ABORT
2

JobAbortCompletedTransition()

JobEventType.JOB_ABORT_COMPLETEDFrom FAIL_ABORT and KILL_ABORT

82 total transitions including "ignore" transitions.  50 are accounted for by the above 7 transitions (6 if you consider the fact that INTERNAL_ERROR_TRANSITION and INTERNAL_REBOOT_TRANSITION are almost exactly the same).  There are 27 distinct named transitions.

List Candidates Based on events

JobEventType.JOB_KILL

JobAbortCompletedTransition and KilledDuringAbortTransition do roughly the same thing calling unsuccessfulFinish with in turn issues some JobHistoryEvent then calls finished.  Finished is called by KillNewJobTransition and CommitSucceededTransition.

 

Here there are 9 transitions that do small variations on dying.  Although they are all a little bit different I'm not convinced the cleanup is really as complicated as they are making it.

 private static class KilledDuringSetupTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.metrics.endRunningJob(job);
 job.addDiagnostic("Job received kill in SETUP state.");
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
 private static class CommitFailedTransition implements
 SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event;
 job.addDiagnostic("Job commit failed: " + jcfe.getMessage());
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }
 private static class KillInitedJobTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.addDiagnostic("Job received Kill in INITED state.");
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
 private static class KillWaitTaskCompletedTransition extends 
 TaskCompletedTransition {
 @Override
 protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
 if (job.completedTaskCount == job.tasks.size()) {
 job.setFinishTime();
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 return JobStateInternal.KILL_ABORT;
 }
 //return the current state, Job not finished yet
 return job.getInternalState();
 }
 }
 
 private static class KilledDuringCommitTransition implements
 SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.setFinishTime();
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 }
 }
 
// Catch of InitTransaction
 } catch (IOException e) {
 LOG.warn("Job init failed", e);
 job.metrics.endPreparingJob(job);
 job.addDiagnostic("Job init failed : "
 + StringUtils.stringifyException(e));
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 return JobStateInternal.FAILED;
 }
 
 private static class SetupFailedTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 job.metrics.endRunningJob(job);
 job.addDiagnostic("Job setup failed : "
 + ((JobSetupFailedEvent) event).getMessage());
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }
 
 //This transition happens when a job is to be failed. It waits for all the
 //tasks to finish / be killed.
 private static class JobFailWaitTransition
 implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 @Override
 public JobStateInternal transition(JobImpl job, JobEvent event) {
 if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
 for(Task task: job.tasks.values()) {
 if(!task.isFinished()) {
 return JobStateInternal.FAIL_WAIT;
 }
 }
 }
 //Finished waiting. All tasks finished / were killed
 job.failWaitTriggerScheduledFuture.cancel(false);
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 return JobStateInternal.FAIL_ABORT;
 }
 }
 
 //This transition happens when a job to be failed times out while waiting on
 //tasks that had been sent the KILL signal. It is triggered by a
 //ScheduledFuture task queued in the executor.
 private static class JobFailWaitTimedOutTransition
 implements SingleArcTransition<JobImpl, JobEvent> {
 @Override
 public void transition(JobImpl job, JobEvent event) {
 LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
 + " Going to fail job anyway");
 job.failWaitTriggerScheduledFuture.cancel(false);
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 }
 }

 

TaskImpl

CountTransitionTriggerComment
2

KILL_TRANSITION

TaskEventType.T_KILL 
2

ATTEMPT_KILLED_TRANSITION

TaskEventType.T_ATTEMPT_KILLED 
2

AttemptFailedTransition()

TaskEventType.T_ATTEMPT_FAILED 

24 total transitions include "ignore" transitions.  There are 16 named transitions  6 transitions are accounted for by 3 named transitions.

KillWaitAttemptSucceededTransition and KillWaitAttemptFailedTransition are essentially the same both being trivial subclasses of KillWaitAttemptKilledTransition (3->1). 

The majority of RetroactiveKilledTransition and RetroactiveFailureTransition is cleanup code for with they are ALMOST copy and pasted.  RetroactiveFailureTransition is actually a subclass of AttemptFailedTransition and the super's transition is actually called (3->2).

InitialScheduleTransition and RedundantScheduleTransition look almost the same less some special handling, I can imagine a system where this is not necessary (2->1.5).

TaskAttemptImpl
CountTransitionTriggerComment
2

RequestContainerTransition()

TaskAttemptEventType.TA_SCHEDULE

TaskAttemptEventType.TA_RESCHEDULE

A bool is passed to the constructor to do cleanup in one case and no the other
4

KilledTransition()

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_CONTAINER_CLEANED

TaskAttemptEventType.TA_CLEANUP_DONE

 
2

FailedTransition()

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_CLEANUP_DONE

 
13

DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION

TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE

 
3DeallocateContainerTransition()

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED

Constructor takes a bool that will "send event to speculator that we withdraw our container needs, if we're transitioning out of UNASSIGNED"
13CLEANUP_CONTAINER_TRANSITION

TaskAttemptEventType.TA_CONTAINER_COMPLETED

TaskAttemptEventType.TA_KILL

TaskAttemptEventType.TA_FAILMSG

TaskAttemptEventType.TA_DONE

TaskAttemptEventType.TA_TIMED_OUT

 
2

StatusUpdater()

TaskAttemptEventType.TA_UPDATE

 
2TaskCleanupTransition()TaskAttemptEventType.TA_CONTAINER_CLEANED 

57 transitions included "ignore" transitions.  41 transitions accounted for by 8 named transitions.

Additional Notes:

No support list in Python MapReduce