/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.TestingJobManagerSharedServicesBuilder;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobMasterTest
extends TestLogger {
    private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Time testingTimeout = Time.seconds((long)10L);
    private static final long fastHeartbeatInterval = 1L;
    private static final long fastHeartbeatTimeout = 5L;
    private static final long heartbeatInterval = 1000L;
    private static final long heartbeatTimeout = 5000L;
    private static final JobGraph jobGraph = new JobGraph(new JobVertex[0]);
    private static TestingRpcService rpcService;
    private static HeartbeatServices fastHeartbeatServices;
    private static HeartbeatServices heartbeatServices;
    private BlobServer blobServer;
    private Configuration configuration;
    private ResourceID jmResourceId;
    private JobMasterId jobMasterId;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService rmLeaderRetrievalService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
        fastHeartbeatServices = new TestingHeartbeatServices(1L, 5L, rpcService.getScheduledExecutor());
        heartbeatServices = new TestingHeartbeatServices(1000L, 5000L, rpcService.getScheduledExecutor());
    }

    @Before
    public void setup() throws IOException {
        this.configuration = new Configuration();
        this.haServices = new TestingHighAvailabilityServices();
        this.jobMasterId = JobMasterId.generate();
        this.jmResourceId = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
        this.rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        this.haServices.setResourceManagerLeaderRetriever((LeaderRetrievalService)this.rmLeaderRetrievalService);
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        this.blobServer = new BlobServer(this.configuration, (BlobStore)new VoidBlobStore());
        this.blobServer.start();
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        if (this.blobServer != null) {
            this.blobServer.close();
        }
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeclineCheckpointInvocationWithUserException() throws Exception {
        AkkaRpcService rpcService1 = null;
        AkkaRpcService rpcService2 = null;
        try {
            ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
            ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
            rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout);
            rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout);
            final CompletableFuture declineCheckpointMessageFuture = new CompletableFuture();
            JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
            JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration((Configuration)this.configuration);
            JobMaster jobMaster = new JobMaster((RpcService)rpcService1, jobMasterConfiguration, this.jmResourceId, jobGraph, this.haServices, (SlotPoolFactory)DefaultSlotPoolFactory.fromConfiguration((Configuration)this.configuration, (RpcService)rpcService1), jobManagerSharedServices, heartbeatServices, this.blobServer, (JobManagerJobMetricGroupFactory)UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, new NoOpOnCompletionActions(), this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader()){

                public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
                    declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
                }
            };
            jobMaster.start(this.jobMasterId, testingTimeout).get();
            String className = "UserException";
            URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava((File)temporaryFolder.newFolder(), (String)"UserException.java", (String)String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }", "UserException", "UserException"));
            Throwable userException = (Throwable)Class.forName("UserException", false, userClassLoader).newInstance();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class).get();
            RpcCheckpointResponder rpcCheckpointResponder = new RpcCheckpointResponder((CheckpointCoordinatorGateway)jobMasterGateway);
            rpcCheckpointResponder.declineCheckpoint(jobGraph.getJobID(), new ExecutionAttemptID(1L, 1L), 1L, userException);
            Throwable throwable = (Throwable)declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(SerializedThrowable.class));
            MatcherAssert.assertThat((Object)throwable.getMessage(), (Matcher)Matchers.equalTo((Object)userException.getMessage()));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcServices((Time)testingTimeout, (RpcService[])new RpcService[]{rpcService1, rpcService2});
            throw throwable;
        }
        RpcUtils.terminateRpcServices((Time)testingTimeout, (RpcService[])new RpcService[]{rpcService1, rpcService2});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithTaskManager() throws Exception {
        CompletableFuture heartbeatResourceIdFuture = new CompletableFuture();
        CompletableFuture disconnectedJobManagerFuture = new CompletableFuture();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete).setDisconnectJobManagerConsumer((jobId, throwable) -> disconnectedJobManagerFuture.complete(jobId)).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, jobManagerSharedServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), (TaskManagerLocation)taskManagerLocation, testingTimeout);
            registrationResponse.get();
            ResourceID heartbeatResourceId = (ResourceID)heartbeatResourceIdFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)heartbeatResourceId, (Matcher)Matchers.equalTo((Object)this.jmResourceId));
            JobID disconnectedJobManager = (JobID)disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)disconnectedJobManager, (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
        }
        finally {
            jobManagerSharedServices.shutdown();
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        String resourceManagerAddress = "rm";
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID rmResourceId = new ResourceID("rm");
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(resourceManagerId, rmResourceId, "rm", "localhost");
        CompletableFuture jobManagerRegistrationFuture = new CompletableFuture();
        CompletableFuture disconnectedJobManagerFuture = new CompletableFuture();
        CountDownLatch registrationAttempts = new CountDownLatch(2);
        resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> {
            jobManagerRegistrationFuture.complete(Tuple3.of((Object)tuple.f0, (Object)tuple.f1, (Object)tuple.f3));
            registrationAttempts.countDown();
        });
        resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
        rpcService.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, jobManagerSharedServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            this.rmLeaderRetrievalService.notifyListener("rm", resourceManagerId.toUUID());
            Tuple3 registrationInformation = (Tuple3)jobManagerRegistrationFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)registrationInformation.f0, (Matcher)Matchers.equalTo((Object)this.jobMasterId));
            MatcherAssert.assertThat((Object)registrationInformation.f1, (Matcher)Matchers.equalTo((Object)this.jmResourceId));
            MatcherAssert.assertThat((Object)registrationInformation.f2, (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
            JobID disconnectedJobManager = (JobID)disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)disconnectedJobManager, (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
            registrationAttempts.await();
        }
        finally {
            jobManagerSharedServices.shutdown();
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoringFromSavepoint() throws Exception {
        long savepointId = 42L;
        File savepointFile = this.createSavepoint(42L);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)true);
        JobGraph jobGraph = this.createJobGraphWithCheckpointing(savepointRestoreSettings);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
            MatcherAssert.assertThat((Object)savepointCheckpoint, (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)savepointCheckpoint.getCheckpointID(), (Matcher)Matchers.is((Object)42L));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoringModifiedJobFromSavepoint() throws Exception {
        long savepointId = 42L;
        OperatorID operatorID = new OperatorID();
        File savepointFile = this.createSavepointWithOperatorState(42L, operatorID);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)false);
        JobVertex jobVertex = new JobVertex("New operator");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraphWithNewOperator = this.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, jobVertex);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        try {
            this.createJobMaster(this.configuration, jobGraphWithNewOperator, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
            Assert.fail((String)"Should fail because we cannot resume the changed JobGraph from the savepoint.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        jobGraphWithNewOperator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)true));
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraphWithNewOperator, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
            MatcherAssert.assertThat((Object)savepointCheckpoint, (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)savepointCheckpoint.getCheckpointID(), (Matcher)Matchers.is((Object)42L));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    @Test
    public void testAutomaticRestartingWhenCheckpointing() throws Exception {
        long savepointId = 42L;
        File savepointFile = this.createSavepoint(42L);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)true);
        JobGraph jobGraph = this.createJobGraphWithCheckpointing(savepointRestoreSettings);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        JobMaster jobMaster = this.createJobMaster(new Configuration(), jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory((Configuration)this.configuration)).build());
        RestartStrategy restartStrategy = jobMaster.getRestartStrategy();
        Assert.assertNotNull((Object)restartStrategy);
        Assert.assertTrue((boolean)(restartStrategy instanceof FixedDelayRestartStrategy));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointPrecedesSavepointRecovery() throws Exception {
        long savepointId = 42L;
        File savepointFile = this.createSavepoint(42L);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)("" + savepointFile.getAbsolutePath()), (boolean)true);
        JobGraph jobGraph = this.createJobGraphWithCheckpointing(savepointRestoreSettings);
        long checkpointId = 1L;
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobGraph.getJobID(), 1L, 1L, 1L, Collections.emptyMap(), null, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new DummyCheckpointStorageLocation());
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        completedCheckpointStore.addCheckpoint(completedCheckpoint);
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
            MatcherAssert.assertThat((Object)savepointCheckpoint, (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)savepointCheckpoint.getCheckpointID(), (Matcher)Matchers.is((Object)1L));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
        JobGraph restartingJobGraph = this.createSingleVertexJobWithRestartStrategy();
        long slotRequestTimeout = 10L;
        this.configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 10L);
        JobMaster jobMaster = this.createJobMaster(this.configuration, restartingJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            long start = System.nanoTime();
            jobMaster.start(JobMasterId.generate(), testingTimeout).get();
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);
            resourceManagerGateway.setRequestSlotConsumer(blockingQueue::offer);
            rpcService.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
            this.rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
            blockingQueue.take();
            CompletableFuture submittedTaskFuture = new CompletableFuture();
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((tdd, ignored) -> {
                submittedTaskFuture.complete(tdd);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
            jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), (TaskManagerLocation)taskManagerLocation, testingTimeout).get();
            SlotRequest slotRequest = (SlotRequest)blockingQueue.take();
            long end = System.nanoTime();
            MatcherAssert.assertThat((Object)((end - start) / 1000000L), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(10L)));
            MatcherAssert.assertThat((Object)submittedTaskFuture.isDone(), (Matcher)Matchers.is((Object)false));
            SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.UNKNOWN);
            CompletableFuture acceptedSlotsFuture = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
            Collection acceptedSlots = (Collection)acceptedSlotsFuture.get();
            MatcherAssert.assertThat((Object)acceptedSlots, (Matcher)Matchers.hasSize((int)1));
            SlotOffer acceptedSlot = (SlotOffer)acceptedSlots.iterator().next();
            MatcherAssert.assertThat((Object)acceptedSlot.getAllocationId(), (Matcher)Matchers.equalTo((Object)slotRequest.getAllocationId()));
            TaskDeploymentDescriptor taskDeploymentDescriptor = (TaskDeploymentDescriptor)submittedTaskFuture.get();
            MatcherAssert.assertThat((Object)taskDeploymentDescriptor.getAllocationId(), (Matcher)Matchers.equalTo((Object)slotRequest.getAllocationId()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseUnestablishedResourceManagerConnection() throws Exception {
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            jobMaster.start(JobMasterId.generate(), testingTimeout).get();
            ResourceManagerId resourceManagerId = ResourceManagerId.generate();
            String firstResourceManagerAddress = "address1";
            String secondResourceManagerAddress = "address2";
            TestingResourceManagerGateway firstResourceManagerGateway = new TestingResourceManagerGateway();
            TestingResourceManagerGateway secondResourceManagerGateway = new TestingResourceManagerGateway();
            rpcService.registerGateway("address1", (RpcGateway)firstResourceManagerGateway);
            rpcService.registerGateway("address2", (RpcGateway)secondResourceManagerGateway);
            OneShotLatch firstJobManagerRegistration = new OneShotLatch();
            OneShotLatch secondJobManagerRegistration = new OneShotLatch();
            firstResourceManagerGateway.setRegisterJobManagerConsumer(jobMasterIdResourceIDStringJobIDTuple4 -> firstJobManagerRegistration.trigger());
            secondResourceManagerGateway.setRegisterJobManagerConsumer(jobMasterIdResourceIDStringJobIDTuple4 -> secondJobManagerRegistration.trigger());
            this.rmLeaderRetrievalService.notifyListener("address1", resourceManagerId.toUUID());
            firstJobManagerRegistration.await();
            this.rmLeaderRetrievalService.notifyListener("address2", resourceManagerId.toUUID());
            secondJobManagerRegistration.await();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReconnectionAfterDisconnect() throws Exception {
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue registrationsQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerConsumer(jobMasterIdResourceIDStringJobIDTuple4 -> registrationsQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
            rpcService.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            ResourceManagerId resourceManagerId = testingResourceManagerGateway.getFencingToken();
            this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), resourceManagerId.toUUID());
            JobMasterId firstRegistrationAttempt = (JobMasterId)registrationsQueue.take();
            MatcherAssert.assertThat((Object)firstRegistrationAttempt, (Matcher)Matchers.equalTo((Object)this.jobMasterId));
            MatcherAssert.assertThat((Object)registrationsQueue.isEmpty(), (Matcher)Matchers.is((Object)true));
            jobMasterGateway.disconnectResourceManager(resourceManagerId, (Exception)new FlinkException("Test exception"));
            MatcherAssert.assertThat(registrationsQueue.take(), (Matcher)Matchers.equalTo((Object)this.jobMasterId));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue registrationQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerConsumer(jobMasterIdResourceIDStringJobIDTuple4 -> registrationQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
            String resourceManagerAddress = testingResourceManagerGateway.getAddress();
            rpcService.registerGateway(resourceManagerAddress, (RpcGateway)testingResourceManagerGateway);
            this.rmLeaderRetrievalService.notifyListener(resourceManagerAddress, testingResourceManagerGateway.getFencingToken().toUUID());
            JobMasterId firstRegistrationAttempt = (JobMasterId)registrationQueue.take();
            MatcherAssert.assertThat((Object)firstRegistrationAttempt, (Matcher)Matchers.equalTo((Object)this.jobMasterId));
            jobMaster.suspend((Exception)new FlinkException("Test exception."), testingTimeout).get();
            JobMasterId jobMasterId2 = JobMasterId.generate();
            jobMaster.start(jobMasterId2, testingTimeout).get();
            JobMasterId secondRegistrationAttempt = (JobMasterId)registrationQueue.take();
            MatcherAssert.assertThat((Object)secondRegistrationAttempt, (Matcher)Matchers.equalTo((Object)jobMasterId2));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestNextInputSplit() throws Exception {
        List<TestingInputSplit> expectedInputSplits = Arrays.asList(new TestingInputSplit(1), new TestingInputSplit(42), new TestingInputSplit(1337));
        TestingInputSplitSource inputSplitSource = new TestingInputSplitSource(expectedInputSplits);
        JobVertex source = new JobVertex("vertex1");
        source.setParallelism(1);
        source.setInputSplitSource((InputSplitSource)inputSplitSource);
        source.setInvokableClass(AbstractInvokable.class);
        JobGraph testJobGraph = new JobGraph(new JobVertex[]{source});
        testJobGraph.setAllowQueuedScheduling(true);
        this.configuration.setLong("restart-strategy.fixed-delay.attempts", 1L);
        this.configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory((Configuration)this.configuration)).build();
        JobMaster jobMaster = this.createJobMaster(this.configuration, testJobGraph, this.haServices, jobManagerSharedServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            ExecutionGraph eg = jobMaster.getExecutionGraph();
            ExecutionVertex ev = (ExecutionVertex)eg.getAllExecutionVertices().iterator().next();
            SupplierWithException inputSplitSupplier = () -> (SerializedInputSplit)jobMasterGateway.requestNextInputSplit(source.getID(), ev.getCurrentExecutionAttempt().getAttemptId()).get();
            List<InputSplit> actualInputSplits = JobMasterTest.getInputSplits(expectedInputSplits.size(), (SupplierWithException<SerializedInputSplit, Exception>)inputSplitSupplier);
            Matcher expectedInputSplitsMatcher = Matchers.containsInAnyOrder((Object[])expectedInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
            MatcherAssert.assertThat(actualInputSplits, (Matcher)expectedInputSplitsMatcher);
            long maxWaitMillis = 2000L;
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L);
            eg.failGlobal((Throwable)new Exception("Testing exception"));
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L);
            actualInputSplits = JobMasterTest.getInputSplits(expectedInputSplits.size(), (SupplierWithException<SerializedInputSplit, Exception>)inputSplitSupplier);
            MatcherAssert.assertThat(actualInputSplits, (Matcher)expectedInputSplitsMatcher);
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    @Nonnull
    private static List<InputSplit> getInputSplits(int numberInputSplits, SupplierWithException<SerializedInputSplit, Exception> nextInputSplit) throws Exception {
        ArrayList<InputSplit> actualInputSplits = new ArrayList<InputSplit>(numberInputSplits);
        for (int i = 0; i < numberInputSplits; ++i) {
            SerializedInputSplit serializedInputSplit = (SerializedInputSplit)nextInputSplit.get();
            MatcherAssert.assertThat((Object)serializedInputSplit.isEmpty(), (Matcher)Matchers.is((Object)false));
            actualInputSplits.add((InputSplit)InstantiationUtil.deserializeObject((byte[])serializedInputSplit.getInputSplitData(), (ClassLoader)ClassLoader.getSystemClassLoader()));
        }
        SerializedInputSplit serializedInputSplit = (SerializedInputSplit)nextInputSplit.get();
        if (!serializedInputSplit.isEmpty()) {
            InputSplit emptyInputSplit = (InputSplit)InstantiationUtil.deserializeObject((byte[])serializedInputSplit.getInputSplitData(), (ClassLoader)ClassLoader.getSystemClassLoader());
            MatcherAssert.assertThat((Object)emptyInputSplit, (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        }
        return actualInputSplits;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateWithoutRegistration() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                jobMasterGateway.requestKvStateLocation(graph.getJobID(), "unknown").get();
                Assert.fail((String)"Expected to fail with UnknownKvStateLocation");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, UnknownKvStateLocation.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateOfWrongJob() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                jobMasterGateway.requestKvStateLocation(new JobID(), "unknown").get();
                Assert.fail((String)"Expected to fail with FlinkJobNotFoundException");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    @Nonnull
    public JobGraph createKvJobGraph() {
        JobVertex vertex1 = new JobVertex("v1");
        vertex1.setParallelism(4);
        vertex1.setMaxParallelism(16);
        vertex1.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex vertex2 = new JobVertex("v2");
        vertex2.setParallelism(4);
        vertex2.setMaxParallelism(16);
        vertex2.setInvokableClass(BlockingNoOpInvokable.class);
        return new JobGraph(new JobVertex[]{vertex1, vertex2});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateWithIrrelevantRegistration() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                jobMasterGateway.notifyKvStateRegistered(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any-name", new KvStateID(), new InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
                Assert.fail((String)"Expected to fail with FlinkJobNotFoundException.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterAndUnregisterKvState() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        List jobVertices = graph.getVerticesSortedTopologicallyFromSources();
        JobVertex vertex1 = (JobVertex)jobVertices.get(0);
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            String registrationName = "register-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            jobMasterGateway.notifyKvStateRegistered(graph.getJobID(), vertex1.getID(), keyGroupRange, "register-me", kvStateID, address).get();
            KvStateLocation location = (KvStateLocation)jobMasterGateway.requestKvStateLocation(graph.getJobID(), "register-me").get();
            Assert.assertEquals((Object)graph.getJobID(), (Object)location.getJobId());
            Assert.assertEquals((Object)vertex1.getID(), (Object)location.getJobVertexId());
            Assert.assertEquals((long)vertex1.getMaxParallelism(), (long)location.getNumKeyGroups());
            Assert.assertEquals((long)1L, (long)location.getNumRegisteredKeyGroups());
            Assert.assertEquals((long)1L, (long)keyGroupRange.getNumberOfKeyGroups());
            Assert.assertEquals((Object)kvStateID, (Object)location.getKvStateID(keyGroupRange.getStartKeyGroup()));
            Assert.assertEquals((Object)address, (Object)location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
            jobMasterGateway.notifyKvStateUnregistered(graph.getJobID(), vertex1.getID(), keyGroupRange, "register-me").get();
            try {
                jobMasterGateway.requestKvStateLocation(graph.getJobID(), "register-me").get();
                Assert.fail((String)"Expected to fail with an UnknownKvStateLocation.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, UnknownKvStateLocation.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        List jobVertices = graph.getVerticesSortedTopologicallyFromSources();
        JobVertex vertex1 = (JobVertex)jobVertices.get(0);
        JobVertex vertex2 = (JobVertex)jobVertices.get(1);
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            String registrationName = "duplicate-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 4396);
            jobMasterGateway.notifyKvStateRegistered(graph.getJobID(), vertex1.getID(), keyGroupRange, "duplicate-me", kvStateID, address).get();
            try {
                jobMasterGateway.notifyKvStateRegistered(graph.getJobID(), vertex2.getID(), keyGroupRange, "duplicate-me", kvStateID, address).get();
                Assert.fail((String)"Expected to fail because of clashing registration message.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Registration name clash").isPresent());
                Assert.assertEquals((Object)JobStatus.FAILED, (Object)jobMaster.getExecutionGraph().getState());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestPartitionState() throws Exception {
        JobGraph producerConsumerJobGraph = this.producerConsumerJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, producerConsumerJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture allocationIdFuture = new CompletableFuture();
            testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
            rpcService.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            CompletableFuture tddFuture = new CompletableFuture();
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                tddFuture.complete(taskDeploymentDescriptor);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), (RpcGateway)testingTaskExecutorGateway);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            AllocationID allocationId = (AllocationID)allocationIdFuture.get();
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), (TaskManagerLocation)taskManagerLocation, testingTimeout).get();
            SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
            Collection slotOffers = (Collection)jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get();
            MatcherAssert.assertThat((Object)slotOffers, (Matcher)Matchers.hasSize((int)1));
            MatcherAssert.assertThat((Object)slotOffers, (Matcher)Matchers.contains((Object[])new SlotOffer[]{slotOffer}));
            TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor)tddFuture.get();
            MatcherAssert.assertThat((Object)tdd.getProducedPartitions(), (Matcher)Matchers.hasSize((int)1));
            ResultPartitionDeploymentDescriptor partition = (ResultPartitionDeploymentDescriptor)tdd.getProducedPartitions().iterator().next();
            ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId();
            ExecutionAttemptID copiedExecutionAttemptId = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart());
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get();
            ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId);
            CompletableFuture partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), partitionId);
            MatcherAssert.assertThat(partitionStateFuture.get(), (Matcher)Matchers.equalTo((Object)ExecutionState.FINISHED));
            partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID());
            try {
                partitionStateFuture.get();
                Assert.fail((String)"Expected failure.");
            }
            catch (ExecutionException e) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, IllegalArgumentException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
            partitionStateFuture = jobMasterGateway.requestPartitionState(new IntermediateDataSetID(), partitionId);
            try {
                partitionStateFuture.get();
                Assert.fail((String)"Expected failure.");
            }
            catch (ExecutionException e) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, IllegalArgumentException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
            partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), new ExecutionAttemptID()));
            try {
                partitionStateFuture.get();
                Assert.fail((String)"Expected failure.");
            }
            catch (ExecutionException e) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, PartitionProducerDisposedException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointTimeout() throws Exception {
        JobMaster jobMaster = new JobMaster((RpcService)rpcService, JobMasterConfiguration.fromConfiguration((Configuration)this.configuration), this.jmResourceId, jobGraph, this.haServices, (SlotPoolFactory)DefaultSlotPoolFactory.fromConfiguration((Configuration)this.configuration, (RpcService)rpcService), new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices, this.blobServer, (JobManagerJobMetricGroupFactory)UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, new NoOpOnCompletionActions(), this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader()){

            public CompletableFuture<String> triggerSavepoint(@Nullable String targetDirectory, boolean cancelJob, Time timeout) {
                return new CompletableFuture<String>();
            }
        };
        try {
            CompletableFuture startFuture = jobMaster.start(this.jobMasterId, testingTimeout);
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds((long)1L));
            CompletableFuture savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
            try {
                savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
                Assert.fail();
            }
            catch (ExecutionException e) {
                Throwable cause = ExceptionUtils.stripExecutionException((Throwable)e);
                MatcherAssert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(TimeoutException.class));
            }
            MatcherAssert.assertThat((Object)savepointFutureHighTimeout.isDone(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)false)));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph jobGraph = this.createSingleVertexJobWithRestartStrategy();
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, jobManagerSharedServices, heartbeatServices);
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
        CompletableFuture allocationIdFuture = new CompletableFuture();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
        CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture();
        CompletableFuture freedSlotFuture = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, throwable) -> {
            freedSlotFuture.complete(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)).createTestingTaskExecutorGateway();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), (RpcGateway)testingTaskExecutorGateway);
        try {
            jobMaster.start(this.jobMasterId, testingTimeout).get();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            AllocationID allocationId = (AllocationID)allocationIdFuture.get();
            jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), (TaskManagerLocation)taskManagerLocation, testingTimeout).get();
            SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
            CompletableFuture acceptedSlotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
            Collection slotOffers = (Collection)acceptedSlotOffers.get();
            MatcherAssert.assertThat((Object)slotOffers, (Matcher)Matchers.hasSize((int)1));
            jobMasterGateway.notifyAllocationFailure(allocationId, (Exception)new FlinkException("Fail alloction test exception"));
            MatcherAssert.assertThat(freedSlotFuture.get(), (Matcher)Matchers.equalTo((Object)allocationId));
            MatcherAssert.assertThat(disconnectTaskExecutorFuture.get(), (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    private JobGraph producerConsumerJobGraph() {
        JobVertex producer = new JobVertex("Producer");
        producer.setInvokableClass(NoOpInvokable.class);
        JobVertex consumer = new JobVertex("Consumer");
        consumer.setInvokableClass(NoOpInvokable.class);
        consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{producer, consumer});
        jobGraph.setAllowQueuedScheduling(true);
        return jobGraph;
    }

    private File createSavepoint(long savepointId) throws IOException {
        return this.createSavepointWithOperatorState(savepointId, new OperatorID[0]);
    }

    private File createSavepointWithOperatorState(long savepointId, OperatorID ... operatorIds) throws IOException {
        File savepointFile = temporaryFolder.newFile();
        Collection<OperatorState> operatorStates = this.createOperatorState(operatorIds);
        SavepointV2 savepoint = new SavepointV2(savepointId, operatorStates, Collections.emptyList());
        try (FileOutputStream fileOutputStream = new FileOutputStream(savepointFile);){
            Checkpoints.storeCheckpointMetadata((Savepoint)savepoint, (OutputStream)fileOutputStream);
        }
        return savepointFile;
    }

    private Collection<OperatorState> createOperatorState(OperatorID ... operatorIds) {
        ArrayList<OperatorState> operatorStates = new ArrayList<OperatorState>(operatorIds.length);
        for (OperatorID operatorId : operatorIds) {
            OperatorState operatorState = new OperatorState(operatorId, 1, 42);
            OperatorSubtaskState subtaskState = new OperatorSubtaskState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.emptyMap(), (StreamStateHandle)new ByteStreamStateHandle("foobar", new byte[0])), null, null, null);
            operatorState.putState(0, subtaskState);
            operatorStates.add(operatorState);
        }
        return operatorStates;
    }

    @Nonnull
    private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) {
        return this.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, new JobVertex[0]);
    }

    @Nonnull
    private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings, JobVertex ... jobVertices) {
        JobGraph jobGraph = new JobGraph(jobVertices);
        CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(1000L, 1000L, 1000L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true);
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), checkpoinCoordinatorConfiguration, null);
        jobGraph.setSnapshotSettings(checkpointingSettings);
        jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
        return jobGraph;
    }

    @Nonnull
    private JobMaster createJobMaster(Configuration configuration, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, JobManagerSharedServices jobManagerSharedServices) throws Exception {
        return this.createJobMaster(configuration, jobGraph, highAvailabilityServices, jobManagerSharedServices, fastHeartbeatServices);
    }

    @Nonnull
    private JobMaster createJobMaster(Configuration configuration, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices) throws Exception {
        JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration((Configuration)configuration);
        return new JobMaster((RpcService)rpcService, jobMasterConfiguration, this.jmResourceId, jobGraph, highAvailabilityServices, (SlotPoolFactory)DefaultSlotPoolFactory.fromConfiguration((Configuration)configuration, (RpcService)rpcService), jobManagerSharedServices, heartbeatServices, this.blobServer, (JobManagerJobMetricGroupFactory)UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, (OnCompletionActions)new NoOpOnCompletionActions(), (FatalErrorHandler)this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader());
    }

    private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
        JobVertex jobVertex = new JobVertex("Test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex});
        jobGraph.setAllowQueuedScheduling(true);
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    private static final class DummyCheckpointStorageLocation
    implements CompletedCheckpointStorageLocation {
        private static final long serialVersionUID = 164095949572620688L;

        private DummyCheckpointStorageLocation() {
        }

        public String getExternalPointer() {
            return null;
        }

        public StreamStateHandle getMetadataHandle() {
            return null;
        }

        public void disposeStorageLocation() throws IOException {
        }
    }

    private static final class NoOpOnCompletionActions
    implements OnCompletionActions {
        private NoOpOnCompletionActions() {
        }

        public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
        }

        public void jobFinishedByOther() {
        }

        public void jobMasterFailed(Throwable cause) {
        }
    }

    private static final class TestingInputSplit
    implements InputSplit {
        private static final long serialVersionUID = -5404803705463116083L;
        private final int splitNumber;

        TestingInputSplit(int number) {
            this.splitNumber = number;
        }

        public int getSplitNumber() {
            return this.splitNumber;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestingInputSplit that = (TestingInputSplit)o;
            return this.splitNumber == that.splitNumber;
        }

        public int hashCode() {
            return Objects.hash(this.splitNumber);
        }
    }

    private static final class TestingInputSplitSource
    implements InputSplitSource<TestingInputSplit> {
        private static final long serialVersionUID = -2344684048759139086L;
        private final List<TestingInputSplit> inputSplits;

        private TestingInputSplitSource(List<TestingInputSplit> inputSplits) {
            this.inputSplits = inputSplits;
        }

        public TestingInputSplit[] createInputSplits(int minNumSplits) {
            return this.inputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS);
        }

        public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] inputSplits) {
            return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
        }
    }
}

