/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow;

import cascading.flow.Flow;
import cascading.flow.FlowException;
import cascading.flow.FlowStep;
import cascading.flow.hadoop.HadoopStepStats;
import cascading.stats.StepStats;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;

public class FlowStepJob
implements Callable<Throwable> {
    private final String stepName;
    private JobConf currentConf;
    private JobClient jobClient;
    private RunningJob runningJob;
    private long pollingInterval = 5000L;
    protected List<FlowStepJob> predecessors;
    private final CountDownLatch latch = new CountDownLatch(1);
    private boolean stop = false;
    private FlowStep flowStep;
    private HadoopStepStats stepStats;
    protected Throwable throwable;

    public FlowStepJob(final FlowStep flowStep, String stepName, JobConf currentConf) {
        this.flowStep = flowStep;
        this.stepName = stepName;
        this.currentConf = currentConf;
        this.pollingInterval = Flow.getJobPollingInterval(currentConf);
        if (flowStep.isDebugEnabled()) {
            flowStep.logDebug("using polling interval: " + this.pollingInterval);
        }
        this.stepStats = new HadoopStepStats(){

            @Override
            public Object getID() {
                return flowStep.getID();
            }

            @Override
            protected JobClient getJobClient() {
                return FlowStepJob.this.jobClient;
            }

            @Override
            protected RunningJob getRunningJob() {
                return FlowStepJob.this.runningJob;
            }
        };
    }

    public void stop() {
        if (this.flowStep.isInfoEnabled()) {
            this.flowStep.logInfo("stopping: " + this.stepName);
        }
        this.stop = true;
        if (!this.stepStats.isPending() && !this.stepStats.isFinished()) {
            this.stepStats.markStopped();
        }
        try {
            if (this.runningJob != null) {
                this.runningJob.killJob();
            }
        }
        catch (IOException exception) {
            this.flowStep.logWarn("unable to kill job: " + this.stepName, exception);
        }
    }

    public void setPredecessors(List<FlowStepJob> predecessors) throws IOException {
        this.predecessors = predecessors;
    }

    @Override
    public Throwable call() {
        this.start();
        return this.throwable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void start() {
        try {
            this.blockOnPredecessors();
            this.blockOnJob();
        }
        catch (Throwable throwable) {
            this.dumpCompletionEvents();
            this.throwable = throwable;
        }
        finally {
            this.latch.countDown();
        }
    }

    protected void blockOnJob() throws IOException {
        if (this.stop) {
            return;
        }
        if (this.flowStep.isInfoEnabled()) {
            this.flowStep.logInfo("starting step: " + this.stepName);
        }
        this.stepStats.markRunning();
        this.jobClient = new JobClient(this.currentConf);
        this.runningJob = this.jobClient.submitJob(this.currentConf);
        this.blockTillCompleteOrStopped();
        if (!this.stop && !this.runningJob.isSuccessful()) {
            if (!this.stepStats.isFinished()) {
                this.stepStats.markFailed(null);
            }
            this.dumpCompletionEvents();
            this.throwable = new FlowException("step failed: " + this.stepName);
        } else if (this.runningJob.isSuccessful() && !this.stepStats.isFinished()) {
            this.stepStats.markSuccessful();
        }
        this.stepStats.captureJobStats();
    }

    protected void blockTillCompleteOrStopped() throws IOException {
        while (!this.stop && !this.runningJob.isComplete()) {
            this.sleep();
        }
    }

    protected void sleep() {
        try {
            Thread.sleep(this.pollingInterval);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    protected void blockOnPredecessors() {
        for (FlowStepJob predecessor : this.predecessors) {
            if (predecessor.isSuccessful()) continue;
            this.flowStep.logWarn("abandoning step: " + this.stepName + ", predecessor failed: " + predecessor.stepName);
            this.stop();
        }
    }

    private void dumpCompletionEvents() {
        try {
            if (this.runningJob == null) {
                return;
            }
            TaskCompletionEvent[] events = this.runningJob.getTaskCompletionEvents(0);
            this.flowStep.logWarn("completion events count: " + events.length);
            for (TaskCompletionEvent event : events) {
                this.flowStep.logWarn("event = " + event);
            }
        }
        catch (IOException exception) {
            this.flowStep.logError("failed reading completion events", exception);
        }
    }

    public boolean isSuccessful() {
        try {
            this.latch.await();
            return this.runningJob != null && this.runningJob.isSuccessful();
        }
        catch (InterruptedException exception) {
            this.flowStep.logWarn("latch interrupted", exception);
        }
        catch (IOException exception) {
            this.flowStep.logWarn("error querying job", exception);
        }
        return false;
    }

    public boolean wasStarted() {
        return this.runningJob != null;
    }

    public StepStats getStepStats() {
        return this.stepStats;
    }
}

