...
Here there are 9 transitions that do small variations on dying. Although they are all a little bit different I'm not convinced the cleanup is really as complicated as they are making it.
Wiki Markupnoformat |
---|
private static class KilledDuringSetupTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.metrics.endRunningJob(job); job.addDiagnostic("Job received kill in SETUP state."); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } private static class CommitFailedTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void public void transition(JobImpl job, JobEvent event) { JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event; job.addDiagnostic("Job commit failed: " + jcfe.getMessage()); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } private static class KillInitedJobTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.addDiagnostic("Job received Kill in INITED state."); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, jobjob.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } private static class KillWaitTaskCompletedTransition extends TaskCompletedTransition { @Override protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { if (job.completedTaskCount == job.tasks.size()) { job.setFinishTime(); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); return JobStateInternal.KILL_ABORT; } //return the current state, Job not finished yet return job.getInternalState(); } } private static class KilledDuringCommitTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { jobjob.setFinishTime(); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); } } // Catch of InitTransaction } catch (IOException e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); return JobStateInternal.FAILED; } private static class SetupFailedTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { job.metrics.endRunningJob(job); job.addDiagnostic("Job setup failed : " + ((JobSetupFailedEvent) event).getMessage()); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } //This transition happens when a job is to be failed. It waits for all the //tasks to finish / be killed. private static class JobFailWaitTransition implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { @Override public JobStateInternal transition(JobImpl job, JobEvent event) { if(!job.failWaitTriggerScheduledFuture.isCancelled()) { for(Task task: job.tasks.values()) { if(!task.isFinished()) { return JobStateInternal.FAIL_WAIT; } } } //Finished waiting. All tasks finished / were killed job.failWaitTriggerScheduledFuture.cancel(false); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); return JobStateInternal.FAIL_ABORT; } } //This transition happens when a job to be failed times out while waiting on //tasks that had been sent the KILL signal. It is triggered by a //ScheduledFuture task queued in the executor. private static class JobFailWaitTimedOutTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override public void transition(JobImpl job, JobEvent event) { LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." + " Going to fail job anyway"); job.failWaitTriggerScheduledFuture.cancel(false); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } } |
TaskImpl
Count | Transition | Trigger | Comment |
---|---|---|---|
2 | KILL_TRANSITION | TaskEventType.T_KILL | |
2 | ATTEMPT_KILLED_TRANSITION | TaskEventType.T_ATTEMPT_KILLED | |
2 | AttemptFailedTransition() | TaskEventType.T_ATTEMPT_FAILED |
...