/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.tasktracker.processor;

import com.github.ltsopensource.core.AppContext;
import com.github.ltsopensource.core.commons.utils.Callable;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.domain.JobRunResult;
import com.github.ltsopensource.core.exception.JobTrackerNotFoundException;
import com.github.ltsopensource.core.exception.RequestTimeoutException;
import com.github.ltsopensource.core.failstore.FailStorePathBuilder;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.AbstractRemotingCommandBody;
import com.github.ltsopensource.core.protocol.command.JobCompletedRequest;
import com.github.ltsopensource.core.protocol.command.JobPushRequest;
import com.github.ltsopensource.core.remoting.RemotingClientDelegate;
import com.github.ltsopensource.core.support.NodeShutdownHook;
import com.github.ltsopensource.core.support.RetryScheduler;
import com.github.ltsopensource.core.support.SystemClock;
import com.github.ltsopensource.remoting.AsyncCallback;
import com.github.ltsopensource.remoting.Channel;
import com.github.ltsopensource.remoting.RemotingCommandBody;
import com.github.ltsopensource.remoting.ResponseFuture;
import com.github.ltsopensource.remoting.exception.RemotingCommandException;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import com.github.ltsopensource.remoting.protocol.RemotingProtos;
import com.github.ltsopensource.tasktracker.domain.Response;
import com.github.ltsopensource.tasktracker.domain.TaskTrackerAppContext;
import com.github.ltsopensource.tasktracker.expcetion.NoAvailableJobRunnerException;
import com.github.ltsopensource.tasktracker.processor.AbstractProcessor;
import com.github.ltsopensource.tasktracker.runner.RunnerCallback;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class JobPushProcessor
extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobPushProcessor.class);
    private RetryScheduler<JobRunResult> retryScheduler;
    private JobRunnerCallback jobRunnerCallback;
    private RemotingClientDelegate remotingClient;

    protected JobPushProcessor(TaskTrackerAppContext appContext) {
        super(appContext);
        this.remotingClient = appContext.getRemotingClient();
        this.retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), (AppContext)appContext, FailStorePathBuilder.getJobFeedbackPath((AppContext)appContext), 3){

            protected boolean isRemotingEnable() {
                return JobPushProcessor.this.remotingClient.isServerEnable();
            }

            protected boolean retry(List<JobRunResult> results) {
                return JobPushProcessor.this.retrySendJobResults(results);
            }
        };
        this.retryScheduler.start();
        this.jobRunnerCallback = new JobRunnerCallback();
        NodeShutdownHook.registerHook((AppContext)appContext, (String)this.getClass().getName(), (Callable)new Callable(){

            public void call() throws Exception {
                JobPushProcessor.this.retryScheduler.stop();
            }
        });
    }

    public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
        JobPushRequest requestBody = (JobPushRequest)request.getBody();
        JobMeta jobMeta = requestBody.getJobMeta();
        try {
            this.appContext.getRunnerPool().execute(jobMeta, this.jobRunnerCallback);
        }
        catch (NoAvailableJobRunnerException e) {
            return RemotingCommand.createResponseCommand((int)JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code(), (String)"job push failure , no available job runner!");
        }
        return RemotingCommand.createResponseCommand((int)JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code(), (String)"job push success!");
    }

    private boolean retrySendJobResults(List<JobRunResult> results) {
        JobCompletedRequest requestBody = (JobCompletedRequest)this.appContext.getCommandBodyWrapper().wrapper((AbstractRemotingCommandBody)new JobCompletedRequest());
        requestBody.setJobRunResults(results);
        requestBody.setReSend(true);
        int requestCode = JobProtos.RequestCode.JOB_COMPLETED.code();
        RemotingCommand request = RemotingCommand.createRequestCommand((int)requestCode, (RemotingCommandBody)requestBody);
        try {
            RemotingCommand commandResponse = this.remotingClient.invokeSync(request);
            if (commandResponse != null && commandResponse.getCode() == RemotingProtos.ResponseCode.SUCCESS.code()) {
                return true;
            }
            LOGGER.warn("Send job failed, {}", new Object[]{commandResponse});
            return false;
        }
        catch (JobTrackerNotFoundException e) {
            LOGGER.error("Retry send job result failed! jobResults={}", new Object[]{results, e});
            return false;
        }
    }

    private class JobRunnerCallback
    implements RunnerCallback {
        private JobRunnerCallback() {
        }

        @Override
        public JobMeta runComplete(Response response) {
            final JobRunResult jobRunResult = new JobRunResult();
            jobRunResult.setTime(Long.valueOf(SystemClock.now()));
            jobRunResult.setJobMeta(response.getJobMeta());
            jobRunResult.setAction(response.getAction());
            jobRunResult.setMsg(response.getMsg());
            JobCompletedRequest requestBody = (JobCompletedRequest)JobPushProcessor.this.appContext.getCommandBodyWrapper().wrapper((AbstractRemotingCommandBody)new JobCompletedRequest());
            requestBody.addJobResult(jobRunResult);
            requestBody.setReceiveNewJob(response.isReceiveNewJob());
            int requestCode = JobProtos.RequestCode.JOB_COMPLETED.code();
            RemotingCommand request = RemotingCommand.createRequestCommand((int)requestCode, (RemotingCommandBody)requestBody);
            final Response returnResponse = new Response();
            try {
                final CountDownLatch latch = new CountDownLatch(1);
                JobPushProcessor.this.remotingClient.invokeAsync(request, new AsyncCallback(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void operationComplete(ResponseFuture responseFuture) {
                        try {
                            RemotingCommand commandResponse = responseFuture.getResponseCommand();
                            if (commandResponse != null && commandResponse.getCode() == RemotingProtos.ResponseCode.SUCCESS.code()) {
                                JobPushRequest jobPushRequest = (JobPushRequest)commandResponse.getBody();
                                if (jobPushRequest != null) {
                                    if (LOGGER.isDebugEnabled()) {
                                        LOGGER.debug("Get new job :{}", new Object[]{jobPushRequest.getJobMeta()});
                                    }
                                    returnResponse.setJobMeta(jobPushRequest.getJobMeta());
                                }
                            } else {
                                if (LOGGER.isInfoEnabled()) {
                                    LOGGER.info("Job feedback failed, save local files\u3002{}", new Object[]{jobRunResult});
                                }
                                try {
                                    JobPushProcessor.this.retryScheduler.inSchedule(jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(), (Object)jobRunResult);
                                }
                                catch (Exception e) {
                                    LOGGER.error("Job feedback failed", (Throwable)e);
                                }
                            }
                        }
                        finally {
                            latch.countDown();
                        }
                    }
                });
                try {
                    latch.await(60000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    throw new RequestTimeoutException((Throwable)e);
                }
            }
            catch (JobTrackerNotFoundException e) {
                try {
                    LOGGER.warn("No job tracker available! save local files.");
                    JobPushProcessor.this.retryScheduler.inSchedule(jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(), (Object)jobRunResult);
                }
                catch (Exception e1) {
                    LOGGER.error("Save files failed, {}", new Object[]{jobRunResult.getJobMeta(), e1});
                }
            }
            return returnResponse.getJobMeta();
        }
    }
}

