/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class TaskExecutor
extends RpcEndpoint
implements TaskExecutorGateway {
    public static final String TASK_MANAGER_NAME = "taskmanager";
    private final TaskManagerLocation taskManagerLocation;
    public static final int MAX_BLOB_PORT = 65536;
    private final HighAvailabilityServices haServices;
    private final TaskManagerConfiguration taskManagerConfiguration;
    private final IOManager ioManager;
    private final MemoryManager memoryManager;
    private final NetworkEnvironment networkEnvironment;
    private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
    private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
    private final FatalErrorHandler fatalErrorHandler;
    private final TaskManagerMetricGroup taskManagerMetricGroup;
    private final BroadcastVariableManager broadcastVariableManager;
    private final FileCache fileCache;
    private TaskExecutorToResourceManagerConnection resourceManagerConnection;
    private Map<ResourceID, JobManagerConnection> jobManagerConnections;
    private final TaskSlotTable taskSlotTable;
    private final JobManagerTable jobManagerTable;
    private final JobLeaderService jobLeaderService;

    public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, TaskSlotTable taskSlotTable, JobManagerTable jobManagerTable, JobLeaderService jobLeaderService, FatalErrorHandler fatalErrorHandler) {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
        Preconditions.checkArgument((taskManagerConfiguration.getNumberSlots() > 0 ? 1 : 0) != 0, (Object)"The number of slots has to be larger than 0.");
        this.taskManagerConfiguration = (TaskManagerConfiguration)Preconditions.checkNotNull((Object)taskManagerConfiguration);
        this.taskManagerLocation = (TaskManagerLocation)Preconditions.checkNotNull((Object)taskManagerLocation);
        this.memoryManager = (MemoryManager)Preconditions.checkNotNull((Object)memoryManager);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.networkEnvironment = (NetworkEnvironment)Preconditions.checkNotNull((Object)networkEnvironment);
        this.haServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)haServices);
        this.taskSlotTable = (TaskSlotTable)Preconditions.checkNotNull((Object)taskSlotTable);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.taskManagerMetricGroup = (TaskManagerMetricGroup)Preconditions.checkNotNull((Object)taskManagerMetricGroup);
        this.broadcastVariableManager = (BroadcastVariableManager)Preconditions.checkNotNull((Object)broadcastVariableManager);
        this.fileCache = (FileCache)Preconditions.checkNotNull((Object)fileCache);
        this.jobManagerTable = (JobManagerTable)Preconditions.checkNotNull((Object)jobManagerTable);
        this.jobLeaderService = (JobLeaderService)Preconditions.checkNotNull((Object)jobLeaderService);
        this.jobManagerConnections = new HashMap<ResourceID, JobManagerConnection>(4);
        this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(this.getResourceID(), new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(this.getResourceID(), new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
    }

    @Override
    public void start() throws Exception {
        super.start();
        try {
            this.haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
        }
        catch (Exception e) {
            this.onFatalError(e);
        }
        this.taskSlotTable.start(new SlotActionsImpl());
        this.jobLeaderService.start(this.getAddress(), this.getRpcService(), this.haServices, new JobLeaderListenerImpl());
    }

    @Override
    public void postStop() throws Exception {
        this.log.info("Stopping TaskManager {}.", (Object)this.getAddress());
        Throwable throwable = null;
        this.taskSlotTable.stop();
        if (this.isConnectedToResourceManager()) {
            this.resourceManagerConnection.close();
        }
        for (JobManagerConnection jobManagerConnection : this.jobManagerConnections.values()) {
            try {
                this.disassociateFromJobManager(jobManagerConnection, (Exception)((Object)new FlinkException("The TaskExecutor is shutting down.")));
            }
            catch (Throwable t) {
                throwable = ExceptionUtils.firstOrSuppressed((Throwable)t, throwable);
            }
        }
        this.jobManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        this.ioManager.shutdown();
        this.memoryManager.shutdown();
        this.networkEnvironment.shutdown();
        this.fileCache.shutdown();
        try {
            super.postStop();
        }
        catch (Throwable e) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)throwable);
        }
        if (throwable != null) {
            ExceptionUtils.rethrowException((Throwable)throwable, (String)"Error while shutting the TaskExecutor down.");
        }
        this.log.info("Stopped TaskManager {}.", (Object)this.getAddress());
    }

    @Override
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
        try {
            boolean taskAdded;
            TaskInformation taskInformation;
            JobInformation jobInformation;
            JobID jobId = tdd.getJobId();
            JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
            if (jobManagerConnection == null) {
                String message = "Could not submit task because there is no JobManager associated for the job " + jobId + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            if (!Objects.equals((Object)jobManagerConnection.getJobMasterId(), (Object)jobMasterId)) {
                String message = "Rejecting the task submission because the job manager leader id " + (Object)((Object)jobMasterId) + " does not match the expected job manager leader id " + (Object)((Object)jobManagerConnection.getJobMasterId()) + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            if (!this.taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) {
                String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + (Object)((Object)tdd.getAllocationId()) + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            BlobCacheService blobCache = jobManagerConnection.getBlobService();
            try {
                tdd.loadBigData(blobCache.getPermanentBlobService());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
            }
            try {
                jobInformation = (JobInformation)tdd.getSerializedJobInformation().deserializeValue(this.getClass().getClassLoader());
                taskInformation = (TaskInformation)tdd.getSerializedTaskInformation().deserializeValue(this.getClass().getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
            }
            if (!jobId.equals((Object)jobInformation.getJobId())) {
                throw new TaskSubmissionException("Inconsistent job ID information inside TaskDeploymentDescriptor (" + tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
            }
            TaskMetricGroup taskMetricGroup = this.taskManagerMetricGroup.addTaskForJob(jobInformation.getJobId(), jobInformation.getJobName(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), taskInformation.getTaskName(), tdd.getSubtaskIndex(), tdd.getAttemptNumber());
            RpcInputSplitProvider inputSplitProvider = new RpcInputSplitProvider(jobManagerConnection.getJobManagerGateway(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), this.taskManagerConfiguration.getTimeout());
            TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
            CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
            BlobCacheService blobService = jobManagerConnection.getBlobService();
            LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
            ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
            PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
            Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getSubtaskIndex(), tdd.getAttemptNumber(), tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), tdd.getTaskStateHandles(), this.memoryManager, this.ioManager, this.networkEnvironment, this.broadcastVariableManager, taskManagerActions, inputSplitProvider, checkpointResponder, blobService, libraryCache, this.fileCache, this.taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, this.getRpcService().getExecutor());
            this.log.info("Received task {}.", (Object)task.getTaskInfo().getTaskNameWithSubtasks());
            try {
                taskAdded = this.taskSlotTable.addTask(task);
            }
            catch (SlotNotActiveException | SlotNotFoundException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }
            if (taskAdded) {
                task.startTaskThread();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            String message = "TaskManager already contains a task for id " + (Object)((Object)task.getExecutionId()) + '.';
            this.log.debug(message);
            throw new TaskSubmissionException(message);
        }
        catch (TaskSubmissionException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.cancelExecution();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Throwable t) {
                return FutureUtils.completedExceptionally(new TaskException("Cannot cancel task for execution " + (Object)((Object)executionAttemptID) + '.', t));
            }
        }
        String message = "Cannot find task to stop for execution " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new TaskException(message));
    }

    @Override
    public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.stopExecution();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Throwable t) {
                return FutureUtils.completedExceptionally(new TaskException("Cannot stop task for execution " + (Object)((Object)executionAttemptID) + '.', t));
            }
        }
        String message = "Cannot find task to stop for execution " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new TaskException(message));
    }

    @Override
    public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            for (PartitionInfo partitionInfo : partitionInfos) {
                IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
                SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID);
                if (singleInputGate != null) {
                    this.getRpcService().execute(() -> {
                        try {
                            singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
                        }
                        catch (IOException | InterruptedException e) {
                            this.log.error("Could not update input data location for task {}. Trying to fail task.", (Object)task.getTaskInfo().getTaskName(), (Object)e);
                            try {
                                task.failExternally(e);
                            }
                            catch (RuntimeException re) {
                                this.log.error("Failed canceling task with execution ID {} after task update failure.", (Object)executionAttemptID, (Object)re);
                            }
                        }
                    });
                    continue;
                }
                return FutureUtils.completedExceptionally(new PartitionException("No reader with ID " + (Object)((Object)intermediateResultPartitionID) + " for task " + (Object)((Object)executionAttemptID) + " was found."));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        this.log.debug("Discard update for input partitions of task {}. Task is no longer running.", (Object)executionAttemptID);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public void failPartition(ExecutionAttemptID executionAttemptID) {
        this.log.info("Discarding the results produced by task execution {}.", (Object)executionAttemptID);
        try {
            this.networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
        }
        catch (Throwable t) {
            this.onFatalError(t);
        }
    }

    @Override
    public void heartbeatFromJobManager(ResourceID resourceID) {
        this.jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) {
        this.log.debug("Trigger checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String message = "TaskManager received a checkpoint request for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message));
    }

    @Override
    public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
        this.log.debug("Confirm checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointComplete(checkpointId);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String message = "TaskManager received a checkpoint confirmation for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public CompletableFuture<Acknowledge> requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) {
        this.log.info("Receive slot request {} for job {} from resource manager with leader id {}.", new Object[]{allocationId, jobId, resourceManagerId});
        try {
            if (this.resourceManagerConnection == null) {
                String message = "TaskManager is not connected to a resource manager.";
                this.log.debug("TaskManager is not connected to a resource manager.");
                throw new SlotAllocationException("TaskManager is not connected to a resource manager.");
            }
            if (!Objects.equals(this.resourceManagerConnection.getTargetLeaderId(), (Object)resourceManagerId)) {
                String message = "The leader id " + (Object)((Object)resourceManagerId) + " does not match with the leader id of the connected resource manager " + this.resourceManagerConnection.getTargetLeaderId() + '.';
                this.log.debug(message);
                throw new SlotAllocationException(message);
            }
            if (this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                if (!this.taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, this.taskManagerConfiguration.getTimeout())) {
                    this.log.info("Could not allocate slot for {}.", (Object)allocationId);
                    throw new SlotAllocationException("Could not allocate slot.");
                }
                this.log.info("Allocated slot for {}.", (Object)allocationId);
            } else if (!this.taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
                String message = "The slot " + slotId + " has already been allocated for a different job.";
                this.log.info(message);
                throw new SlotOccupiedException(message, this.taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
            }
            if (this.jobManagerTable.contains(jobId)) {
                this.offerSlotsToJobManager(jobId);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            try {
                this.jobLeaderService.addJob(jobId, targetAddress);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Exception e) {
                try {
                    this.taskSlotTable.freeSlot(allocationId);
                }
                catch (SlotNotFoundException slotNotFoundException) {
                    this.onFatalError(slotNotFoundException);
                }
                if (this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) throw new SlotAllocationException("Could not add job to job leader service.", e);
                this.onFatalError(new Exception("Could not free slot " + slotId));
                throw new SlotAllocationException("Could not add job to job leader service.", e);
            }
        }
        catch (SlotAllocationException slotAllocationException) {
            return FutureUtils.completedExceptionally(slotAllocationException);
        }
    }

    @Override
    public void disconnectJobManager(JobID jobId, Exception cause) {
        this.closeJobManagerConnection(jobId, cause);
    }

    @Override
    public void disconnectResourceManager(Exception cause) {
        this.closeResourceManagerConnection(cause);
    }

    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
        if (this.resourceManagerConnection != null) {
            if (newLeaderAddress != null) {
                this.log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", (Object)this.resourceManagerConnection.getTargetAddress(), (Object)newLeaderAddress);
            } else {
                this.log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", (Object)this.resourceManagerConnection.getTargetAddress());
            }
            if (this.resourceManagerConnection != null) {
                this.resourceManagerConnection.close();
                this.resourceManagerConnection = null;
            }
        }
        if (newLeaderAddress != null) {
            this.log.info("Attempting to register at ResourceManager {}", (Object)newLeaderAddress);
            this.resourceManagerConnection = new TaskExecutorToResourceManagerConnection(this.log, this.getRpcService(), this.getAddress(), this.getResourceID(), this.taskSlotTable.createSlotReport(this.getResourceID()), newLeaderAddress, newResourceManagerId, this.getMainThreadExecutor(), new ResourceManagerRegistrationListener());
            this.resourceManagerConnection.start();
        }
    }

    private void establishResourceManagerConnection(ResourceID resourceManagerResourceId) {
        this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<SlotReport>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, SlotReport slotReport) {
                ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)TaskExecutor.this.resourceManagerConnection.getTargetGateway();
                resourceManagerGateway.heartbeatFromTaskManager(resourceID, slotReport);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) {
            }
        });
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.isConnectedToResourceManager()) {
            this.log.info("Close ResourceManager connection {}.", (Object)this.resourceManagerConnection.getResourceManagerId(), (Object)cause);
            this.resourceManagerHeartbeatManager.unmonitorTarget(this.resourceManagerConnection.getResourceManagerId());
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            resourceManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void offerSlotsToJobManager(JobID jobId) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
        if (jobManagerConnection == null) {
            this.log.debug("There is no job manager connection to the leader of job {}.", (Object)jobId);
        } else if (this.taskSlotTable.hasAllocatedSlots(jobId)) {
            this.log.info("Offer reserved slots to the leader of job {}.", (Object)jobId);
            JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
            Iterator<TaskSlot> reservedSlotsIterator = this.taskSlotTable.getAllocatedSlots(jobId);
            JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
            HashSet<SlotOffer> reservedSlots = new HashSet<SlotOffer>(2);
            while (reservedSlotsIterator.hasNext()) {
                SlotOffer offer;
                block7: {
                    offer = reservedSlotsIterator.next().generateSlotOffer();
                    try {
                        if (this.taskSlotTable.markSlotActive(offer.getAllocationId())) break block7;
                        String message = "Could not mark slot " + jobId + " active.";
                        this.log.debug(message);
                        jobMasterGateway.failSlot(this.getResourceID(), offer.getAllocationId(), new Exception(message));
                    }
                    catch (SlotNotFoundException e) {
                        String message = "Could not mark slot " + jobId + " active.";
                        jobMasterGateway.failSlot(this.getResourceID(), offer.getAllocationId(), new Exception(message));
                        continue;
                    }
                }
                reservedSlots.add(offer);
            }
            CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(this.getResourceID(), reservedSlots, this.taskManagerConfiguration.getTimeout());
            acceptedSlotsFuture.whenCompleteAsync((acceptedSlots, throwable) -> {
                if (throwable != null) {
                    if (throwable instanceof TimeoutException) {
                        this.log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
                        this.offerSlotsToJobManager(jobId);
                    } else {
                        this.log.warn("Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager.", throwable);
                        for (SlotOffer reservedSlot : reservedSlots) {
                            this.freeSlot(reservedSlot.getAllocationId(), (Throwable)throwable);
                        }
                    }
                } else if (this.isJobManagerConnectionValid(jobId, jobMasterId)) {
                    for (SlotOffer acceptedSlot : acceptedSlots) {
                        reservedSlots.remove(acceptedSlot);
                    }
                    Exception e = new Exception("The slot was rejected by the JobManager.");
                    for (SlotOffer rejectedSlot : reservedSlots) {
                        this.freeSlot(rejectedSlot.getAllocationId(), e);
                    }
                } else {
                    this.log.debug("Discard offer slot response since there is a new leader for the job {}.", (Object)jobId);
                }
            }, (Executor)this.getMainThreadExecutor());
        } else {
            this.log.debug("There are no unassigned slots for the job {}.", (Object)jobId);
        }
    }

    private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) {
        if (this.jobManagerTable.contains(jobId)) {
            JobManagerConnection oldJobManagerConnection = this.jobManagerTable.get(jobId);
            if (Objects.equals((Object)oldJobManagerConnection.getJobMasterId(), jobMasterGateway.getFencingToken())) {
                this.log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobMasterGateway.getFencingToken());
                return;
            }
            this.closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.'));
        }
        this.log.info("Establish JobManager connection for job {}.", (Object)jobId);
        ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
        JobManagerConnection newJobManagerConnection = this.associateWithJobManager(jobId, jobManagerResourceID, jobMasterGateway, registrationSuccess.getBlobPort());
        this.jobManagerConnections.put(jobManagerResourceID, newJobManagerConnection);
        this.jobManagerTable.put(jobId, newJobManagerConnection);
        this.jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                jobMasterGateway.heartbeatFromTaskManager(resourceID);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, Void payload) {
            }
        });
        this.offerSlotsToJobManager(jobId);
    }

    private void closeJobManagerConnection(JobID jobId, Exception cause) {
        this.log.info("Close JobManager connection for job {}.", (Object)jobId);
        Iterator<Task> tasks = this.taskSlotTable.getTasks(jobId);
        while (tasks.hasNext()) {
            tasks.next().failExternally(new Exception("JobManager responsible for " + jobId + " lost the leadership."));
        }
        Iterator<AllocationID> activeSlots = this.taskSlotTable.getActiveSlots(jobId);
        while (activeSlots.hasNext()) {
            AllocationID activeSlot = activeSlots.next();
            try {
                if (this.taskSlotTable.markSlotInactive(activeSlot, this.taskManagerConfiguration.getTimeout())) continue;
                this.freeSlot(activeSlot, new Exception("Slot could not be marked inactive."));
            }
            catch (SlotNotFoundException e) {
                this.log.debug("Could not mark the slot {} inactive.", (Object)jobId, (Object)e);
            }
        }
        JobManagerConnection jobManagerConnection = this.jobManagerTable.remove(jobId);
        if (jobManagerConnection != null) {
            try {
                this.jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID());
                this.jobManagerConnections.remove(jobManagerConnection.getResourceID());
                this.disassociateFromJobManager(jobManagerConnection, cause);
            }
            catch (IOException e) {
                this.log.warn("Could not properly disassociate from JobManager {}.", (Object)jobManagerConnection.getJobManagerGateway().getAddress(), (Object)e);
            }
        }
    }

    private JobManagerConnection associateWithJobManager(JobID jobID, ResourceID resourceID, JobMasterGateway jobMasterGateway, int blobPort) {
        BlobLibraryCacheManager libraryCacheManager;
        BlobCacheService blobService;
        Preconditions.checkNotNull((Object)jobID);
        Preconditions.checkNotNull((Object)resourceID);
        Preconditions.checkNotNull((Object)jobMasterGateway);
        Preconditions.checkArgument((blobPort > 0 || blobPort < 65536 ? 1 : 0) != 0, (Object)"Blob server port is out of range.");
        TaskManagerActionsImpl taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
        RpcCheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
        InetSocketAddress blobServerAddress = new InetSocketAddress(jobMasterGateway.getHostname(), blobPort);
        try {
            blobService = new BlobCacheService(blobServerAddress, this.taskManagerConfiguration.getConfiguration(), this.haServices.createBlobStore());
            libraryCacheManager = new BlobLibraryCacheManager(blobService.getPermanentBlobService(), this.taskManagerConfiguration.getClassLoaderResolveOrder(), this.taskManagerConfiguration.getAlwaysParentFirstLoaderPatterns());
        }
        catch (IOException e) {
            String message = "Could not create BLOB cache or library cache.";
            this.log.error("Could not create BLOB cache or library cache.", (Throwable)e);
            throw new RuntimeException("Could not create BLOB cache or library cache.", e);
        }
        RpcResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(jobMasterGateway, this.getRpcService().getExecutor(), this.taskManagerConfiguration.getTimeout());
        RpcPartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
        return new JobManagerConnection(jobID, resourceID, jobMasterGateway, taskManagerActions, checkpointResponder, blobService, libraryCacheManager, resultPartitionConsumableNotifier, partitionStateChecker);
    }

    private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
        Preconditions.checkNotNull((Object)jobManagerConnection);
        JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
        jobManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
        jobManagerConnection.getLibraryCacheManager().shutdown();
        jobManagerConnection.getBlobService().close();
    }

    private void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.failExternally(cause);
            }
            catch (Throwable t) {
                this.log.error("Could not fail task {}.", (Object)executionAttemptID, (Object)t);
            }
        } else {
            this.log.debug("Cannot find task to fail for execution {}.", (Object)executionAttemptID);
        }
    }

    private void updateTaskExecutionState(JobMasterGateway jobMasterGateway, TaskExecutionState taskExecutionState) {
        ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
        CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
        futureAcknowledge.whenCompleteAsync((ack, throwable) -> {
            if (throwable != null) {
                this.failTask(executionAttemptID, (Throwable)throwable);
            }
        }, (Executor)this.getMainThreadExecutor());
    }

    private void unregisterTaskAndNotifyFinalState(JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        Task task = this.taskSlotTable.removeTask(executionAttemptID);
        if (task != null) {
            if (!task.getExecutionState().isTerminal()) {
                try {
                    task.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
                }
                catch (Exception e) {
                    this.log.error("Could not properly fail task.", (Throwable)e);
                }
            }
            this.log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.", new Object[]{task.getExecutionState(), task.getTaskInfo().getTaskName(), task.getExecutionId()});
            AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
            this.updateTaskExecutionState(jobMasterGateway, new TaskExecutionState(task.getJobID(), task.getExecutionId(), task.getExecutionState(), task.getFailureCause(), accumulatorSnapshot, task.getMetricGroup().getIOMetricGroup().createSnapshot()));
        } else {
            this.log.error("Cannot find task with ID {} to unregister.", (Object)executionAttemptID);
        }
    }

    private void freeSlot(AllocationID allocationId, Throwable cause) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        try {
            int freedSlotIndex = this.taskSlotTable.freeSlot(allocationId, cause);
            if (freedSlotIndex != -1 && this.isConnectedToResourceManager()) {
                ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
                resourceManagerGateway.notifySlotAvailable(this.resourceManagerConnection.getRegistrationId(), new SlotID(this.getResourceID(), freedSlotIndex), allocationId);
            }
        }
        catch (SlotNotFoundException e) {
            this.log.debug("Could not free slot for allocation id {}.", (Object)allocationId, (Object)e);
        }
    }

    private void freeSlot(AllocationID allocationId) {
        this.freeSlot(allocationId, new Exception("The slot " + (Object)((Object)allocationId) + " is beeing freed."));
    }

    private void timeoutSlot(AllocationID allocationId, UUID ticket) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        Preconditions.checkNotNull((Object)ticket);
        if (this.taskSlotTable.isValidTimeout(allocationId, ticket)) {
            this.freeSlot(allocationId, new Exception("The slot " + (Object)((Object)allocationId) + " has timed out."));
        } else {
            this.log.debug("Received an invalid timeout for allocation id {} with ticket {}.", (Object)allocationId, (Object)ticket);
        }
    }

    private boolean isConnectedToResourceManager() {
        return this.resourceManagerConnection != null && this.resourceManagerConnection.isConnected();
    }

    private boolean isJobManagerConnectionValid(JobID jobId, JobMasterId jobMasterId) {
        JobManagerConnection jmConnection = this.jobManagerTable.get(jobId);
        return jmConnection != null && Objects.equals((Object)jmConnection.getJobMasterId(), (Object)jobMasterId);
    }

    public ResourceID getResourceID() {
        return this.taskManagerLocation.getResourceID();
    }

    void onFatalError(Throwable t) {
        try {
            this.log.error("Fatal error occurred in TaskExecutor.", t);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.fatalErrorHandler.onFatalError(t);
    }

    @VisibleForTesting
    TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
        return this.resourceManagerConnection;
    }

    @VisibleForTesting
    HeartbeatManager<Void, SlotReport> getResourceManagerHeartbeatManager() {
        return this.resourceManagerHeartbeatManager;
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, SlotReport> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceId) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
                    TaskExecutor.this.closeResourceManagerConnection(new TimeoutException("The heartbeat of ResourceManager with id " + resourceId + " timed out."));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<SlotReport> retrievePayload() {
            return TaskExecutor.this.callAsync(() -> TaskExecutor.this.taskSlotTable.createSlotReport(TaskExecutor.this.getResourceID()), TaskExecutor.this.taskManagerConfiguration.getTimeout());
        }
    }

    private class JobManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private JobManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobManagerConnection jobManagerConnection;
                    TaskExecutor.this.log.info("The heartbeat of JobManager with id {} timed out.", (Object)resourceID);
                    if (TaskExecutor.this.jobManagerConnections.containsKey(resourceID) && (jobManagerConnection = (JobManagerConnection)TaskExecutor.this.jobManagerConnections.get(resourceID)) != null) {
                        TaskExecutor.this.closeJobManagerConnection(jobManagerConnection.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
                    }
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<Void> retrievePayload() {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class SlotActionsImpl
    implements SlotActions {
        private SlotActionsImpl() {
        }

        @Override
        public void freeSlot(final AllocationID allocationId) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.freeSlot(allocationId);
                }
            });
        }

        @Override
        public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.timeoutSlot(allocationId, ticket);
                }
            });
        }
    }

    private final class TaskManagerActionsImpl
    implements TaskManagerActions {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway)Preconditions.checkNotNull((Object)jobMasterGateway);
        }

        @Override
        public void notifyFinalState(final ExecutionAttemptID executionAttemptID) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.unregisterTaskAndNotifyFinalState(TaskManagerActionsImpl.this.jobMasterGateway, executionAttemptID);
                }
            });
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            try {
                TaskExecutor.this.log.error(message, cause);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            TaskExecutor.this.fatalErrorHandler.onFatalError(cause);
        }

        @Override
        public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.failTask(executionAttemptID, cause);
                }
            });
        }

        @Override
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            TaskExecutor.this.updateTaskExecutionState(this.jobMasterGateway, taskExecutionState);
        }
    }

    private final class ResourceManagerRegistrationListener
    implements RegistrationConnectionListener<TaskExecutorRegistrationSuccess> {
        private ResourceManagerRegistrationListener() {
        }

        @Override
        public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
            final ResourceID resourceManagerId = success.getResourceManagerId();
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.establishResourceManagerConnection(resourceManagerId);
                }
            });
        }

        @Override
        public void onRegistrationFailure(Throwable failure) {
            TaskExecutor.this.onFatalError(failure);
        }
    }

    private final class JobLeaderListenerImpl
    implements JobLeaderListener {
        private JobLeaderListenerImpl() {
        }

        @Override
        public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.establishJobManagerConnection(jobId, jobManagerGateway, registrationMessage));
        }

        @Override
        public void jobManagerLostLeadership(final JobID jobId, JobMasterId jobMasterId) {
            TaskExecutor.this.log.info("JobManager for job {} with leader id {} lost leadership.", (Object)jobId, (Object)jobMasterId);
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.closeJobManagerConnection(jobId, new Exception("Job leader for job id " + jobId + " lost leadership."));
                }
            });
        }

        @Override
        public void handleError(Throwable throwable) {
            TaskExecutor.this.onFatalError(throwable);
        }
    }

    private final class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
        }

        @Override
        public void handleError(Exception exception) {
            TaskExecutor.this.onFatalError(exception);
        }
    }
}

