/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.FaultySubmittedJobGraphStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;

public class DispatcherTest
extends TestLogger {
    private static RpcService rpcService;
    private static final Time TIMEOUT;
    private static final JobID TEST_JOB_ID;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public TestName name = new TestName();
    private JobGraph jobGraph;
    private TestingFatalErrorHandler fatalErrorHandler;
    private FaultySubmittedJobGraphStore submittedJobGraphStore;
    private TestingLeaderElectionService dispatcherLeaderElectionService;
    private TestingLeaderElectionService jobMasterLeaderElectionService;
    private RunningJobsRegistry runningJobsRegistry;
    private CountDownLatch createdJobManagerRunnerLatch;
    private Configuration configuration;
    private BlobServer blobServer;
    private TestingDispatcher dispatcher;
    private TestingHighAvailabilityServices haServices;
    private HeartbeatServices heartbeatServices;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)TIMEOUT);
            rpcService = null;
        }
    }

    @Before
    public void setUp() throws Exception {
        JobVertex testVertex = new JobVertex("testVertex");
        testVertex.setInvokableClass(NoOpInvokable.class);
        this.jobGraph = new JobGraph(TEST_JOB_ID, "testJob", new JobVertex[]{testVertex});
        this.jobGraph.setAllowQueuedScheduling(true);
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.heartbeatServices = new HeartbeatServices(1000L, 10000L);
        this.submittedJobGraphStore = new FaultySubmittedJobGraphStore();
        this.dispatcherLeaderElectionService = new TestingLeaderElectionService();
        this.jobMasterLeaderElectionService = new TestingLeaderElectionService();
        this.haServices = new TestingHighAvailabilityServices();
        this.haServices.setDispatcherLeaderElectionService(this.dispatcherLeaderElectionService);
        this.haServices.setSubmittedJobGraphStore(this.submittedJobGraphStore);
        this.haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, this.jobMasterLeaderElectionService);
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
        this.haServices.setResourceManagerLeaderRetriever((LeaderRetrievalService)new SettableLeaderRetrievalService());
        this.runningJobsRegistry = this.haServices.getRunningJobsRegistry();
        this.configuration = new Configuration();
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        this.createdJobManagerRunnerLatch = new CountDownLatch(2);
        this.blobServer = new BlobServer(this.configuration, (BlobStore)new VoidBlobStore());
    }

    @Nonnull
    private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        TestingDispatcher dispatcher = this.createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
        dispatcher.start();
        return dispatcher;
    }

    @Nonnull
    private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        return new TestingDispatcher(rpcService, "dispatcher_" + this.name.getMethodName(), this.configuration, haServices, new TestingResourceManagerGateway(), this.blobServer, heartbeatServices, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, (ArchivedExecutionGraphStore)new MemoryArchivedExecutionGraphStore(), jobManagerRunnerFactory, this.fatalErrorHandler);
    }

    @After
    public void tearDown() throws Exception {
        try {
            this.fatalErrorHandler.rethrowError();
        }
        finally {
            if (this.dispatcher != null) {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)this.dispatcher, (Time)TIMEOUT);
            }
        }
        if (this.haServices != null) {
            this.haServices.closeAndCleanupAllData();
        }
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @Test
    public void testJobSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        CompletableFuture<UUID> leaderFuture = this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
        leaderFuture.get();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        acknowledgeFuture.get();
        Assert.assertTrue((String)"jobManagerRunner was not started", (boolean)this.dispatcherLeaderElectionService.isStarted());
    }

    @Test
    public void testLeaderElection() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        CompletableFuture jobIdsFuture = new CompletableFuture();
        this.submittedJobGraphStore.setJobIdsFunction((FunctionWithException<Collection<JobID>, Collection<JobID>, ? extends Exception>)((FunctionWithException)jobIds -> {
            jobIdsFuture.complete(null);
            return jobIds;
        }));
        this.electDispatcher();
        jobIdsFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testSubmittedJobGraphListener() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        SubmittedJobGraph submittedJobGraph = this.submittedJobGraphStore.recoverJobGraph(TEST_JOB_ID);
        this.submittedJobGraphStore.removeJobGraph(TEST_JOB_ID);
        this.dispatcher.onRemovedJobGraph(TEST_JOB_ID);
        Assert.assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), (Matcher)Matchers.empty());
        this.runningJobsRegistry.clearJob(TEST_JOB_ID);
        this.submittedJobGraphStore.putJobGraph(submittedJobGraph);
        this.dispatcher.onAddedJobGraph(TEST_JOB_ID);
        this.createdJobManagerRunnerLatch.await();
        Assert.assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), (Matcher)Matchers.hasSize((int)1));
    }

    @Test
    public void testOnAddedJobGraphRecoveryFailure() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        FlinkException expectedFailure = new FlinkException("Expected failure");
        this.submittedJobGraphStore.setRecoveryFailure((Exception)expectedFailure);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(this.jobGraph, null));
        this.dispatcher.onAddedJobGraph(TEST_JOB_ID);
        CompletableFuture<Throwable> errorFuture = this.fatalErrorHandler.getErrorFuture();
        Throwable throwable = errorFuture.get();
        Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)throwable, expectedFailure::equals).isPresent(), (Matcher)Is.is((Object)true));
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void testOnAddedJobGraphWithFinishedJob() throws Throwable {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(this.jobGraph, null));
        this.runningJobsRegistry.setJobFinished(TEST_JOB_ID);
        this.dispatcher.onAddedJobGraph(TEST_JOB_ID);
        this.dispatcher.getRecoverOperationFuture(TIMEOUT).get();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assert.assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), (Matcher)Is.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testCacheJobExecutionResult() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID failedJobId = new JobID();
        JobStatus expectedState = JobStatus.FAILED;
        ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder().setJobID(failedJobId).setState(expectedState).setFailureCause(new ErrorInfo((Throwable)new RuntimeException("expected"), 1L)).build();
        this.dispatcher.completeJobExecution(failedExecutionGraph);
        Assert.assertThat(dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(), (Matcher)Matchers.equalTo((Object)expectedState));
        Assert.assertThat(dispatcherGateway.requestJob(failedJobId, TIMEOUT).get(), (Matcher)Matchers.equalTo((Object)failedExecutionGraph));
    }

    @Test
    public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        try {
            dispatcherGateway.requestJob(new JobID(), TIMEOUT).get();
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(FlinkJobNotFoundException.class));
        }
    }

    @Test
    public void testJobRecovery() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(this.submittedJobGraphStore.getJobIds(), (Matcher)Matchers.contains((Object[])new JobID[]{this.jobGraph.getJobID()}));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat((Object)this.runningJobsRegistry.getJobSchedulingStatus(this.jobGraph.getJobID()), (Matcher)Is.is((Object)RunningJobsRegistry.JobSchedulingStatus.RUNNING));
        this.dispatcherLeaderElectionService.notLeader();
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        this.createdJobManagerRunnerLatch.await();
        Collection jobIds = (Collection)dispatcherGateway.listJobs(TIMEOUT).get();
        Assert.assertThat((Object)jobIds, (Matcher)Matchers.hasSize((int)1));
        Assert.assertThat((Object)jobIds, (Matcher)Matchers.contains((Object[])new JobID[]{this.jobGraph.getJobID()}));
    }

    @Test
    public void testSavepointDisposal() throws Exception {
        URI externalPointer = this.createTestingSavepoint();
        Path savepointPath = Paths.get(externalPointer);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat((Object)Files.exists(savepointPath, new LinkOption[0]), (Matcher)Is.is((Object)true));
        dispatcherGateway.disposeSavepoint(externalPointer.toString(), TIMEOUT).get();
        Assert.assertThat((Object)Files.exists(savepointPath, new LinkOption[0]), (Matcher)Is.is((Object)false));
    }

    @Nonnull
    private URI createTestingSavepoint() throws IOException, URISyntaxException {
        StateBackend stateBackend = Checkpoints.loadStateBackend((Configuration)this.configuration, (ClassLoader)Thread.currentThread().getContextClassLoader(), (Logger)this.log);
        CheckpointStorage checkpointStorage = stateBackend.createCheckpointStorage(this.jobGraph.getJobID());
        File savepointFile = this.temporaryFolder.newFolder();
        long checkpointId = 1L;
        CheckpointStorageLocation checkpointStorageLocation = checkpointStorage.initializeLocationForSavepoint(1L, savepointFile.getAbsolutePath());
        CheckpointMetadataOutputStream metadataOutputStream = checkpointStorageLocation.createMetadataOutputStream();
        Checkpoints.storeCheckpointMetadata((Savepoint)new SavepointV2(1L, Collections.emptyList(), Collections.emptyList()), (OutputStream)metadataOutputStream);
        CompletedCheckpointStorageLocation completedCheckpointStorageLocation = metadataOutputStream.closeAndFinalizeCheckpoint();
        return new URI(completedCheckpointStorageLocation.getExternalPointer());
    }

    @Test
    public void testWaitingForJobMasterLeadership() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CompletableFuture jobStatusFuture = dispatcherGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT);
        Assert.assertThat((Object)jobStatusFuture.isDone(), (Matcher)Is.is((Object)false));
        try {
            jobStatusFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Should not complete.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat(jobStatusFuture.get(), (Matcher)Matchers.notNullValue());
    }

    @Test
    public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        FlinkException testException = new FlinkException("Test exception");
        this.submittedJobGraphStore.setJobIdsFunction((FunctionWithException<Collection<JobID>, Collection<JobID>, ? extends Exception>)((FunctionWithException)jobIds -> {
            throw testException;
        }));
        this.electDispatcher();
        Throwable error = this.fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)error, (String)testException.getMessage()).isPresent(), (Matcher)Is.is((Object)true));
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(this.jobGraph, null);
        this.submittedJobGraphStore.putJobGraph(submittedJobGraph);
        this.submittedJobGraphStore.setRecoverJobGraphFunction((BiFunctionWithException<JobID, Map<JobID, SubmittedJobGraph>, SubmittedJobGraph, ? extends Exception>)((BiFunctionWithException)(jobId, submittedJobs) -> {
            throw testException;
        }));
        this.electDispatcher();
        Throwable error = this.fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)error, (String)testException.getMessage()).isPresent(), (Matcher)Is.is((Object)true));
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void testJobSubmissionErrorAfterJobRecovery() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        JobGraph failingJobGraph = this.createFailingJobGraph((Exception)testException);
        SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(failingJobGraph, null);
        this.submittedJobGraphStore.putJobGraph(submittedJobGraph);
        this.electDispatcher();
        Throwable error = this.fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)error, (String)testException.getMessage()).isPresent(), (Matcher)Is.is((Object)true));
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void testBlockingJobManagerRunner() throws Exception {
        OneShotLatch jobManagerRunnerCreationLatch = new OneShotLatch();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new BlockingJobManagerRunnerFactory((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> ((OneShotLatch)jobManagerRunnerCreationLatch).await())));
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submissionFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        Assert.assertThat((Object)submissionFuture.isDone(), (Matcher)Is.is((Object)false));
        CompletableFuture metricQueryServicePathsFuture = dispatcherGateway.requestMetricQueryServicePaths(Time.seconds((long)5L));
        Assert.assertThat(metricQueryServicePathsFuture.get(), (Matcher)Is.is((Matcher)Matchers.empty()));
        Assert.assertThat((Object)submissionFuture.isDone(), (Matcher)Is.is((Object)false));
        jobManagerRunnerCreationLatch.trigger();
        submissionFuture.get();
    }

    @Test
    public void testFailingJobManagerRunnerCleanup() throws Exception {
        FlinkException testException = new FlinkException("Test exception.");
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new BlockingJobManagerRunnerFactory((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            Optional take = (Optional)queue.take();
            Exception exception = take.orElse(null);
            if (exception != null) {
                throw exception;
            }
        })));
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submissionFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        Assert.assertThat((Object)submissionFuture.isDone(), (Matcher)Is.is((Object)false));
        queue.offer(Optional.of(testException));
        try {
            submissionFuture.get();
            Assert.fail((String)"Should fail because we could not instantiate the JobManagerRunner.");
        }
        catch (Exception e) {
            Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, t -> t.equals(testException)).isPresent(), (Matcher)Is.is((Object)true));
        }
        submissionFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        queue.offer(Optional.empty());
        submissionFuture.get();
    }

    private void electDispatcher() {
        UUID expectedLeaderSessionId = UUID.randomUUID();
        Assert.assertNull(this.dispatcherLeaderElectionService.getConfirmationFuture());
        this.dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
    }

    private JobGraph createFailingJobGraph(Exception failureCause) {
        FailingJobVertex jobVertex = new FailingJobVertex("Failing JobVertex", failureCause);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return new JobGraph(this.jobGraph.getJobID(), "Failing JobGraph", new JobVertex[]{jobVertex});
    }

    static {
        TIMEOUT = Time.seconds((long)10L);
        TEST_JOB_ID = new JobID();
    }

    private static final class ExpectedJobIdJobManagerRunnerFactory
    implements Dispatcher.JobManagerRunnerFactory {
        private final JobID expectedJobId;
        private final CountDownLatch createdJobManagerRunnerLatch;

        private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId, CountDownLatch createdJobManagerRunnerLatch) {
            this.expectedJobId = expectedJobId;
            this.createdJobManagerRunnerLatch = createdJobManagerRunnerLatch;
        }

        public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            Assert.assertEquals((Object)this.expectedJobId, (Object)jobGraph.getJobID());
            this.createdJobManagerRunnerLatch.countDown();
            return Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
        }
    }

    private static class FailingJobVertex
    extends JobVertex {
        private static final long serialVersionUID = 3218428829168840760L;
        private final Exception failure;

        private FailingJobVertex(String name, Exception failure) {
            super(name);
            this.failure = failure;
        }

        public void initializeOnMaster(ClassLoader loader) throws Exception {
            throw this.failure;
        }
    }

    private final class BlockingJobManagerRunnerFactory
    extends TestingJobManagerRunnerFactory {
        @Nonnull
        private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;

        BlockingJobManagerRunnerFactory(ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
            super(new CompletableFuture<JobGraph>(), new CompletableFuture<ArchivedExecutionGraph>(), CompletableFuture.completedFuture(null));
            this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
        }

        @Override
        public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            this.jobManagerRunnerCreationLatch.run();
            return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
        }
    }
}

