/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.ExecutionGraphException;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.slf4j.Logger;

public class JobMaster
extends FencedRpcEndpoint<JobMasterId>
implements JobMasterGateway {
    public static final String JOB_MANAGER_NAME = "jobmanager";
    public static final String ARCHIVE_NAME = "archive";
    private final JobMasterGateway selfGateway = this.getSelfGateway(JobMasterGateway.class);
    private final ResourceID resourceId;
    private final JobGraph jobGraph;
    private final Configuration configuration;
    private final Time rpcTimeout;
    private final HighAvailabilityServices highAvailabilityServices;
    private final BlobServer blobServer;
    private final BlobLibraryCacheManager libraryCacheManager;
    private final MetricGroup jobManagerMetricGroup;
    private final MetricGroup jobMetricGroup;
    private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private final Executor executor;
    private final OnCompletionActions jobCompletionActions;
    private final FatalErrorHandler errorHandler;
    private final ClassLoader userCodeLoader;
    private final ExecutionGraph executionGraph;
    private final SlotPool slotPool;
    private final SlotPoolGateway slotPoolGateway;
    private LeaderRetrievalService resourceManagerLeaderRetriever;
    private ResourceManagerConnection resourceManagerConnection;
    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;

    public JobMaster(RpcService rpcService, ResourceID resourceId, JobGraph jobGraph, Configuration configuration, HighAvailabilityServices highAvailabilityService, HeartbeatServices heartbeatServices, ScheduledExecutorService executor, BlobServer blobServer, BlobLibraryCacheManager libraryCacheManager, RestartStrategyFactory restartStrategyFactory, Time rpcAskTimeout, @Nullable JobManagerMetricGroup jobManagerMetricGroup, OnCompletionActions jobCompletionActions, FatalErrorHandler errorHandler, ClassLoader userCodeLoader) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
        this.resourceId = (ResourceID)Preconditions.checkNotNull((Object)resourceId);
        this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.rpcTimeout = rpcAskTimeout;
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityService);
        this.blobServer = (BlobServer)Preconditions.checkNotNull((Object)blobServer);
        this.libraryCacheManager = (BlobLibraryCacheManager)Preconditions.checkNotNull((Object)libraryCacheManager);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.jobCompletionActions = (OnCompletionActions)Preconditions.checkNotNull((Object)jobCompletionActions);
        this.errorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)errorHandler);
        this.userCodeLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeLoader);
        this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(this.selfGateway), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(resourceId, new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        String jobName = jobGraph.getName();
        JobID jid = jobGraph.getJobID();
        if (jobManagerMetricGroup != null) {
            this.jobManagerMetricGroup = jobManagerMetricGroup;
            this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
        } else {
            this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
            this.jobMetricGroup = new UnregisteredMetricsGroup();
        }
        this.log.info("Initializing job {} ({}).", (Object)jobName, (Object)jid);
        RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = ((ExecutionConfig)jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader)).getRestartStrategy();
        RestartStrategy restartStrategy = restartStrategyConfiguration != null ? RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : restartStrategyFactory.createRestartStrategy();
        this.log.info("Using restart strategy {} for {} ({}).", new Object[]{restartStrategy, jobName, jid});
        CheckpointRecoveryFactory checkpointRecoveryFactory = this.highAvailabilityServices.getCheckpointRecoveryFactory();
        this.resourceManagerLeaderRetriever = this.highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
        this.slotPoolGateway = this.slotPool.getSelfGateway(SlotPoolGateway.class);
        this.executionGraph = ExecutionGraphBuilder.buildGraph(null, jobGraph, configuration, executor, executor, this.slotPool.getSlotProvider(), userCodeLoader, checkpointRecoveryFactory, rpcAskTimeout, restartStrategy, this.jobMetricGroup, -1, blobServer, this.log);
        this.executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
        this.registeredTaskManagers = new HashMap<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>>(4);
    }

    @Override
    public void start() {
        throw new UnsupportedOperationException("Should never call start() without leader ID");
    }

    public CompletableFuture<Acknowledge> start(JobMasterId newJobMasterId, Time timeout) throws Exception {
        super.start();
        return this.callAsyncWithoutFencing(() -> this.startJobExecution(newJobMasterId), timeout);
    }

    public CompletableFuture<Acknowledge> suspend(Throwable cause, Time timeout) {
        CompletableFuture<Acknowledge> suspendFuture = this.callAsyncWithoutFencing(() -> this.suspendExecution(cause), timeout);
        this.stop();
        return suspendFuture;
    }

    @Override
    public void postStop() throws Exception {
        this.taskManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        this.suspendExecution(new Exception("JobManager is shutting down."));
        super.postStop();
    }

    @Override
    public CompletableFuture<Acknowledge> cancel(Time timeout) {
        this.executionGraph.cancel();
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Acknowledge> stop(Time timeout) {
        try {
            this.executionGraph.stop();
        }
        catch (StoppingException e) {
            return FutureUtils.completedExceptionally((Throwable)((Object)e));
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        Preconditions.checkNotNull((Object)taskExecutionState, (String)"taskExecutionState");
        if (this.executionGraph.updateState(taskExecutionState)) {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new ExecutionGraphException("The execution attempt " + (Object)((Object)taskExecutionState.getID()) + " was not found.")));
    }

    @Override
    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) {
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)executionAttempt);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", (Object)executionAttempt);
            }
            return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + (Object)((Object)executionAttempt)));
        }
        ExecutionJobVertex vertex = this.executionGraph.getJobVertex(vertexID);
        if (vertex == null) {
            this.log.error("Cannot find execution vertex for vertex ID {}.", (Object)vertexID);
            return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + (Object)((Object)vertexID)));
        }
        InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
        if (splitAssigner == null) {
            this.log.error("No InputSplitAssigner for vertex ID {}.", (Object)vertexID);
            return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + (Object)((Object)vertexID)));
        }
        SimpleSlot slot = execution.getAssignedResource();
        int taskId = execution.getVertex().getParallelSubtaskIndex();
        String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
        InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Send next input split {}.", (Object)nextInputSplit);
        }
        try {
            byte[] serializedInputSplit = InstantiationUtil.serializeObject((Object)nextInputSplit);
            return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit));
        }
        catch (Exception ex) {
            this.log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), (Object)ex);
            IOException reason = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", ex);
            vertex.fail(reason);
            return FutureUtils.completedExceptionally(reason);
        }
    }

    @Override
    public CompletableFuture<ExecutionState> requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) {
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)resultPartitionId.getProducerId());
        if (execution != null) {
            return CompletableFuture.completedFuture(execution.getState());
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get((Object)intermediateResultId);
        if (intermediateResult != null) {
            Execution producerExecution = intermediateResult.getPartitionById(resultPartitionId.getPartitionId()).getProducer().getCurrentExecutionAttempt();
            if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
                return CompletableFuture.completedFuture(producerExecution.getState());
            }
            return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId));
        }
        return FutureUtils.completedExceptionally(new IllegalArgumentException("Intermediate data set with ID " + (Object)((Object)intermediateResultId) + " not found."));
    }

    @Override
    public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(ResultPartitionID partitionID, Time timeout) {
        try {
            this.executionGraph.scheduleOrUpdateConsumers(partitionID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public void disconnectTaskManager(ResourceID resourceID, Exception cause) {
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        this.slotPoolGateway.releaseTaskManager(resourceID);
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = this.registeredTaskManagers.remove(resourceID);
        if (taskManagerConnection != null) {
            ((TaskExecutorGateway)taskManagerConnection.f1).disconnectJobManager(this.jobGraph.getJobID(), cause);
        }
    }

    @Override
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        final CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
        if (checkpointCoordinator != null) {
            this.getRpcService().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
                    }
                    catch (Throwable t) {
                        JobMaster.this.log.warn("Error while processing checkpoint acknowledgement message");
                    }
                }
            });
        } else {
            this.log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", (Object)this.jobGraph.getJobID());
        }
    }

    @Override
    public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointID, Throwable reason) {
        final DeclineCheckpoint decline = new DeclineCheckpoint(jobID, executionAttemptID, checkpointID, reason);
        final CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            this.getRpcService().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        checkpointCoordinator.receiveDeclineMessage(decline);
                    }
                    catch (Exception e) {
                        JobMaster.this.log.error("Error in CheckpointCoordinator while processing {}", (Object)decline, (Object)e);
                    }
                }
            });
        } else {
            this.log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", (Object)this.jobGraph.getJobID());
        }
    }

    @Override
    public CompletableFuture<KvStateLocation> lookupKvStateLocation(String registrationName) {
        KvStateLocationRegistry registry;
        KvStateLocation location;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Lookup key-value state for job {} with registration name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
        }
        if ((location = (registry = this.executionGraph.getKvStateLocationRegistry()).getKvStateLocation(registrationName)) != null) {
            return CompletableFuture.completedFuture(location);
        }
        return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName));
    }

    @Override
    public void notifyKvStateRegistered(JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state registered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
        }
        catch (Exception e) {
            this.log.error("Failed to notify KvStateRegistry about registration {}.", (Object)registrationName);
        }
    }

    @Override
    public void notifyKvStateUnregistered(JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state unregistered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexId, keyGroupRange, registrationName);
        }
        catch (Exception e) {
            this.log.error("Failed to notify KvStateRegistry about registration {}.", (Object)registrationName);
        }
    }

    @Override
    public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
        return CompletableFuture.completedFuture(new ClassloadingProps(this.blobServer.getPort(), this.executionGraph.getRequiredJarFiles(), this.executionGraph.getRequiredClasspaths()));
    }

    @Override
    public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID taskManagerId, Iterable<SlotOffer> slots, Time timeout) {
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = this.registeredTaskManagers.get(taskManagerId);
        if (taskManager == null) {
            return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
        }
        JobID jid = this.jobGraph.getJobID();
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)taskManager.f0;
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.f1;
        ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<Tuple2<AllocatedSlot, SlotOffer>>();
        RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, (JobMasterId)((Object)this.getFencingToken()));
        for (SlotOffer slotOffer : slots) {
            AllocatedSlot slot = new AllocatedSlot(slotOffer.getAllocationId(), jid, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), rpcTaskManagerGateway);
            slotsAndOffers.add((Tuple2<AllocatedSlot, SlotOffer>)new Tuple2((Object)slot, (Object)slotOffer));
        }
        return this.slotPoolGateway.offerSlots(slotsAndOffers);
    }

    @Override
    public void failSlot(ResourceID taskManagerId, AllocationID allocationId, Exception cause) {
        if (this.registeredTaskManagers.containsKey(taskManagerId)) {
            this.slotPoolGateway.failAllocation(allocationId, cause);
        } else {
            this.log.warn("Cannot fail slot " + (Object)((Object)allocationId) + " because the TaskManager " + taskManagerId + " is unknown.");
        }
    }

    @Override
    public CompletableFuture<RegistrationResponse> registerTaskManager(String taskManagerRpcAddress, TaskManagerLocation taskManagerLocation, Time timeout) {
        ResourceID taskManagerId = taskManagerLocation.getResourceID();
        if (this.registeredTaskManagers.containsKey(taskManagerId)) {
            JMTMRegistrationSuccess response = new JMTMRegistrationSuccess(this.resourceId, this.blobServer.getPort());
            return CompletableFuture.completedFuture(response);
        }
        return this.getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class).handleAsync((taskExecutorGateway, throwable) -> {
            if (throwable != null) {
                return new RegistrationResponse.Decline(throwable.getMessage());
            }
            this.slotPoolGateway.registerTaskManager(taskManagerId);
            this.registeredTaskManagers.put(taskManagerId, (Tuple2<TaskManagerLocation, TaskExecutorGateway>)Tuple2.of((Object)taskManagerLocation, (Object)taskExecutorGateway));
            this.taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                    taskExecutorGateway.heartbeatFromJobManager(resourceID);
                }
            });
            return new JMTMRegistrationSuccess(this.resourceId, this.blobServer.getPort());
        }, (Executor)this.getMainThreadExecutor());
    }

    @Override
    public void disconnectResourceManager(ResourceManagerId resourceManagerId, Exception cause) {
        if (this.resourceManagerConnection != null && ((ResourceManagerId)((Object)this.resourceManagerConnection.getTargetLeaderId())).equals((Object)resourceManagerId)) {
            this.closeResourceManagerConnection(cause);
        }
    }

    @Override
    public void heartbeatFromTaskManager(ResourceID resourceID) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
    }

    @Override
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
        return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(this.executionGraph), this.executor);
    }

    @Override
    public CompletableFuture<AccessExecutionGraph> requestArchivedExecutionGraph(Time timeout) {
        return CompletableFuture.completedFuture(this.executionGraph.archive());
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
        return CompletableFuture.completedFuture(this.executionGraph.getState());
    }

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
        this.validateRunsInMainThread();
        Preconditions.checkNotNull((Object)((Object)newJobMasterId), (String)"The new JobMasterId must not be null.");
        if (Objects.equals(this.getFencingToken(), (Object)newJobMasterId)) {
            this.log.info("Already started the job execution with JobMasterId {}.", (Object)newJobMasterId);
            return Acknowledge.get();
        }
        if (this.getFencingToken() != null) {
            this.log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", this.getFencingToken(), (Object)newJobMasterId);
            this.suspendExecution(new FlinkException("Old job with JobMasterId " + this.getFencingToken() + " is restarted with a new JobMasterId " + (Object)((Object)newJobMasterId) + '.'));
        }
        this.setFencingToken(newJobMasterId);
        this.log.info("Starting execution of job {} ({})", (Object)this.jobGraph.getName(), (Object)this.jobGraph.getJobID());
        try {
            this.log.debug("Staring SlotPool component");
            this.slotPool.start((JobMasterId)((Object)this.getFencingToken()), this.getAddress());
            this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
        }
        catch (Throwable t) {
            this.log.error("Failed to start job {} ({})", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), t});
            throw new Exception("Could not start job execution: Failed to start JobMaster services.", t);
        }
        this.executor.execute(() -> {
            try {
                this.executionGraph.scheduleForExecution();
            }
            catch (Throwable t) {
                this.executionGraph.failGlobal(t);
            }
        });
        return Acknowledge.get();
    }

    private Acknowledge suspendExecution(Throwable cause) {
        this.validateRunsInMainThread();
        if (this.getFencingToken() == null) {
            this.log.debug("Job has already been suspended or shutdown.");
            return Acknowledge.get();
        }
        this.setFencingToken(null);
        try {
            this.resourceManagerLeaderRetriever.stop();
        }
        catch (Throwable t) {
            this.log.warn("Failed to stop resource manager leader retriever when suspending.", t);
        }
        this.executionGraph.suspend(cause);
        this.slotPoolGateway.suspend();
        this.closeResourceManagerConnection(new Exception("Execution was suspended.", cause));
        return Acknowledge.get();
    }

    private void handleFatalError(Throwable cause) {
        try {
            this.log.error("Fatal error occurred on JobManager.", cause);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.errorHandler.onFatalError(cause);
    }

    private void jobStatusChanged(JobStatus newJobStatus, long timestamp, @Nullable Throwable error) {
        this.validateRunsInMainThread();
        JobID jobID = this.executionGraph.getJobID();
        String jobName = this.executionGraph.getJobName();
        if (newJobStatus.isGloballyTerminalState()) {
            switch (newJobStatus) {
                case FINISHED: {
                    try {
                        Map<String, Object> accumulatorResults = this.executionGraph.getAccumulators();
                        JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults);
                        this.executor.execute(() -> this.jobCompletionActions.jobFinished(result));
                    }
                    catch (Exception e) {
                        this.log.error("Cannot fetch final accumulators for job {} ({})", new Object[]{jobName, jobID, e});
                        JobExecutionException exception = new JobExecutionException(jobID, "Failed to retrieve accumulator results. The job is registered as 'FINISHED (successful), but this notification describes a failure, since the resulting accumulators could not be fetched.", e);
                        this.executor.execute(() -> this.jobCompletionActions.jobFailed((Throwable)((Object)exception)));
                    }
                    break;
                }
                case CANCELED: {
                    JobExecutionException exception = new JobExecutionException(jobID, "Job was cancelled.", new Exception("The job was cancelled"));
                    this.executor.execute(() -> this.jobCompletionActions.jobFailed((Throwable)((Object)exception)));
                    break;
                }
                case FAILED: {
                    Throwable unpackedError = SerializedThrowable.get((Throwable)error, (ClassLoader)this.userCodeLoader);
                    JobExecutionException exception = new JobExecutionException(jobID, "Job execution failed.", unpackedError);
                    this.executor.execute(() -> this.jobCompletionActions.jobFailed((Throwable)((Object)exception)));
                    break;
                }
                default: {
                    throw new IllegalStateException(newJobStatus.toString());
                }
            }
        }
    }

    private void notifyOfNewResourceManagerLeader(String resourceManagerAddress, ResourceManagerId resourceManagerId) {
        if (this.resourceManagerConnection != null) {
            if (resourceManagerAddress != null) {
                if (Objects.equals(resourceManagerAddress, this.resourceManagerConnection.getTargetAddress()) && Objects.equals((Object)resourceManagerId, this.resourceManagerConnection.getTargetLeaderId())) {
                    return;
                }
                this.closeResourceManagerConnection(new Exception("ResourceManager leader changed to new address " + resourceManagerAddress));
                this.log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", (Object)this.resourceManagerConnection.getTargetAddress(), (Object)resourceManagerAddress);
            } else {
                this.log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", (Object)this.resourceManagerConnection.getTargetAddress());
            }
        }
        if (resourceManagerAddress != null) {
            this.log.info("Attempting to register at ResourceManager {}", (Object)resourceManagerAddress);
            this.resourceManagerConnection = new ResourceManagerConnection(this.log, this.jobGraph.getJobID(), this.resourceId, this.getAddress(), (JobMasterId)((Object)this.getFencingToken()), resourceManagerAddress, resourceManagerId, this.executor);
            this.resourceManagerConnection.start();
        }
    }

    private void establishResourceManagerConnection(JobMasterRegistrationSuccess success) {
        ResourceManagerId resourceManagerId = success.getResourceManagerId();
        if (this.resourceManagerConnection != null && Objects.equals(this.resourceManagerConnection.getTargetLeaderId(), (Object)resourceManagerId)) {
            this.log.info("JobManager successfully registered at ResourceManager, leader id: {}.", (Object)resourceManagerId);
            final ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            this.slotPoolGateway.connectToResourceManager(resourceManagerGateway);
            this.resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    resourceManagerGateway.heartbeatFromJobManager(resourceID);
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                }
            });
        } else {
            this.log.debug("Ignoring resource manager connection to {} because its a duplicate or outdated.", (Object)resourceManagerId);
        }
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.resourceManagerConnection != null) {
            this.log.info("Close ResourceManager connection {}.", (Object)this.resourceManagerConnection.getResourceManagerResourceID(), (Object)cause);
            this.resourceManagerHeartbeatManager.unmonitorTarget(this.resourceManagerConnection.getResourceManagerResourceID());
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            resourceManagerGateway.disconnectJobManager(this.resourceManagerConnection.getJobID(), cause);
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
        this.slotPoolGateway.disconnectResourceManager();
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceId) {
            JobMaster.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobMaster.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
                    JobMaster.this.closeResourceManagerConnection(new TimeoutException("The heartbeat of ResourceManager with id " + resourceId + " timed out."));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<Void> retrievePayload() {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class TaskManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway)Preconditions.checkNotNull((Object)jobMasterGateway);
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            JobMaster.this.log.info("Heartbeat of TaskManager with id {} timed out.", (Object)resourceID);
            this.jobMasterGateway.disconnectTaskManager(resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<Void> retrievePayload() {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class JobManagerJobStatusListener
    implements JobStatusListener {
        private JobManagerJobStatusListener() {
        }

        @Override
        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
            JobMaster.this.runAsync(() -> JobMaster.this.jobStatusChanged(newJobStatus, timestamp, error));
        }
    }

    private class ResourceManagerConnection
    extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> {
        private final JobID jobID;
        private final ResourceID jobManagerResourceID;
        private final String jobManagerRpcAddress;
        private final JobMasterId jobMasterId;
        private ResourceID resourceManagerResourceID;

        ResourceManagerConnection(Logger log, JobID jobID, ResourceID jobManagerResourceID, String jobManagerRpcAddress, JobMasterId jobMasterId, String resourceManagerAddress, ResourceManagerId resourceManagerId, Executor executor) {
            super(log, resourceManagerAddress, resourceManagerId, executor);
            this.jobID = (JobID)Preconditions.checkNotNull((Object)jobID);
            this.jobManagerResourceID = (ResourceID)Preconditions.checkNotNull((Object)jobManagerResourceID);
            this.jobManagerRpcAddress = (String)Preconditions.checkNotNull((Object)jobManagerRpcAddress);
            this.jobMasterId = (JobMasterId)((Object)Preconditions.checkNotNull((Object)((Object)jobMasterId)));
        }

        @Override
        protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
            return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>(this.log, JobMaster.this.getRpcService(), "ResourceManager", ResourceManagerGateway.class, this.getTargetAddress(), (ResourceManagerId)((Object)this.getTargetLeaderId())){

                @Override
                protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
                    Time timeout = Time.milliseconds((long)timeoutMillis);
                    return gateway.registerJobManager(ResourceManagerConnection.this.jobMasterId, ResourceManagerConnection.this.jobManagerResourceID, ResourceManagerConnection.this.jobManagerRpcAddress, ResourceManagerConnection.this.jobID, timeout);
                }
            };
        }

        @Override
        protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
            JobMaster.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    ResourceManagerConnection.this.resourceManagerResourceID = success.getResourceManagerResourceId();
                    JobMaster.this.establishResourceManagerConnection(success);
                }
            });
        }

        @Override
        protected void onRegistrationFailure(Throwable failure) {
            JobMaster.this.handleFatalError(failure);
        }

        public ResourceID getResourceManagerResourceID() {
            return this.resourceManagerResourceID;
        }

        public JobID getJobID() {
            return this.jobID;
        }
    }

    private class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            JobMaster.this.runAsync(() -> JobMaster.this.notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
        }

        @Override
        public void handleError(Exception exception) {
            JobMaster.this.handleFatalError(new Exception("Fatal error in the ResourceManager leader service", exception));
        }
    }
}

