Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Here there are 9 transitions that do small variations on dying.  Although they are all a little bit different I'm not convinced the cleanup is really as complicated as they are making it.

Wiki Markupnoformat
  private static class KilledDuringSetupTransition

 implements SingleArcTransition<JobImpl, JobEvent> {
  
 @Override
  
 public void transition(JobImpl job, JobEvent event) {
    
 job.metrics.endRunningJob(job);
    
 job.addDiagnostic("Job received kill in SETUP state.");
    
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
 
        job.jobContext,
        
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
    }

 }
 
 
 private static class CommitFailedTransition implements
 
    SingleArcTransition<JobImpl, JobEvent> {

   @Override
 public void  public void transition(JobImpl job, JobEvent event) {
    
 JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event;
    
 job.addDiagnostic("Job commit failed: " + jcfe.getMessage());
    
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
        
 job.jobContext,
 
        org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
  
 }

 }


 private static class KillInitedJobTransition

 implements SingleArcTransition<JobImpl, JobEvent> {
 
  @Override
    public void transition(JobImpl job, JobEvent event) {
   
  job.addDiagnostic("Job received Kill in INITED state.");
      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
          jobjob.jobContext,
        
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
  
 }

 }
 
 
 private static class KillWaitTaskCompletedTransition extends  
    
 TaskCompletedTransition {

   @Override
  
 protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
    
 if (job.completedTaskCount == job.tasks.size()) {
     
  job.setFinishTime();
        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
            job.jobContext,
            org.job.jobContext,
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
    
   return JobStateInternal.KILL_ABORT;
    
 }
    
 //return the current state, Job not finished yet
   
  return job.getInternalState();
    }

 }

 
 private static class KilledDuringCommitTransition implements
 
   SingleArcTransition<JobImpl, JobEvent> {
 
 @Override
   public void transition(JobImpl job, JobEvent event) {
     jobjob.setFinishTime();
   
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
       
 job.jobContext,
       
 org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
 
 }
 }
 
// Catch of InitTransaction
    
 } catch (IOException e) {
        LOG.warn("Job init failed", e);
      
 job.metrics.endPreparingJob(job);
 
      job.addDiagnostic("Job init failed : "
  
         + StringUtils.stringifyException(e));
        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
          
 job.jobContext,
          
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 
      return JobStateInternal.FAILED;
    
 }
     
 
 private static class SetupFailedTransition
      implements SingleArcTransition<JobImpl, JobEvent> {
  
 @Override
    public void transition(JobImpl job, JobEvent event) {

     job.metrics.endRunningJob(job);
      job.addDiagnostic("Job setup failed : "
 
        + ((JobSetupFailedEvent) event).getMessage());
    
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
  
       job.jobContext,
        
 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
    }

 }
 
   //This transition happens when a job is to be failed. It waits for all the
  //tasks to finish / be killed.

 private static class JobFailWaitTransition

 implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
  
 @Override
  
 public JobStateInternal transition(JobImpl job, JobEvent event) {
      if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
      
 for(Task task: job.tasks.values()) {
  
       if(!task.isFinished()) {
        
   return JobStateInternal.FAIL_WAIT;
        
 }
   
    }
    
 }
    
 //Finished waiting. All tasks finished / were killed
 
    job.failWaitTriggerScheduledFuture.cancel(false);
    
 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
      
 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
    
 return JobStateInternal.FAIL_ABORT;
  
 }

 }
 
 
 //This transition happens when a job to be failed times out while waiting on

 //tasks that had been sent the KILL signal. It is triggered by a

 //ScheduledFuture task queued in the executor.

 private static class JobFailWaitTimedOutTransition

 implements SingleArcTransition<JobImpl, JobEvent> {
  
 @Override
  
 public void transition(JobImpl job, JobEvent event) {
  
   LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
      
 + " Going to fail job anyway");
    
 job.failWaitTriggerScheduledFuture.cancel(false);
 
    job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
    
   job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
 
  }
  }

 

TaskImpl

CountTransitionTriggerComment
2

KILL_TRANSITION

TaskEventType.T_KILL 
2

ATTEMPT_KILLED_TRANSITION

TaskEventType.T_ATTEMPT_KILLED 
2

AttemptFailedTransition()

TaskEventType.T_ATTEMPT_FAILED 

...