/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.FaultySubmittedJobGraphStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class DispatcherResourceCleanupTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static final Time timeout = Time.seconds((long)10L);
    private static TestingRpcService rpcService;
    private JobID jobId;
    private JobGraph jobGraph;
    private Configuration configuration;
    private TestingLeaderElectionService dispatcherLeaderElectionService;
    private SingleRunningJobsRegistry runningJobsRegistry;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private OneShotLatch clearedJobLatch;
    private TestingDispatcher dispatcher;
    private DispatcherGateway dispatcherGateway;
    private TestingFatalErrorHandler fatalErrorHandler;
    private BlobServer blobServer;
    private PermanentBlobKey permanentBlobKey;
    private File blobFile;
    private CompletableFuture<BlobKey> storedHABlobFuture;
    private CompletableFuture<JobID> deleteAllHABlobsFuture;
    private CompletableFuture<ArchivedExecutionGraph> resultFuture;
    private CompletableFuture<JobID> cleanupJobFuture;
    private CompletableFuture<Void> terminationFuture;
    private FaultySubmittedJobGraphStore submittedJobGraphStore;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        JobVertex testVertex = new JobVertex("testVertex");
        testVertex.setInvokableClass(NoOpInvokable.class);
        this.jobId = new JobID();
        this.jobGraph = new JobGraph(this.jobId, "testJob", new JobVertex[]{testVertex});
        this.jobGraph.setAllowQueuedScheduling(true);
        this.configuration = new Configuration();
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.dispatcherLeaderElectionService = new TestingLeaderElectionService();
        this.highAvailabilityServices.setDispatcherLeaderElectionService(this.dispatcherLeaderElectionService);
        this.clearedJobLatch = new OneShotLatch();
        this.runningJobsRegistry = new SingleRunningJobsRegistry(this.jobId, this.clearedJobLatch);
        this.highAvailabilityServices.setRunningJobsRegistry(this.runningJobsRegistry);
        this.submittedJobGraphStore = new FaultySubmittedJobGraphStore();
        this.highAvailabilityServices.setSubmittedJobGraphStore(this.submittedJobGraphStore);
        this.storedHABlobFuture = new CompletableFuture();
        this.deleteAllHABlobsFuture = new CompletableFuture();
        TestingBlobStore testingBlobStore = new TestingBlobStoreBuilder().setPutFunction(putArguments -> this.storedHABlobFuture.complete((BlobKey)putArguments.f2)).setDeleteAllFunction(this.deleteAllHABlobsFuture::complete).createTestingBlobStore();
        this.cleanupJobFuture = new CompletableFuture();
        this.terminationFuture = new CompletableFuture();
        this.blobServer = new TestingBlobServer(this.configuration, testingBlobStore, this.cleanupJobFuture);
        this.permanentBlobKey = this.blobServer.putPermanent(this.jobId, new byte[256]);
        this.blobFile = this.blobServer.getStorageLocation(this.jobId, (BlobKey)this.permanentBlobKey);
        this.resultFuture = new CompletableFuture();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.dispatcher = new TestingDispatcher((RpcService)rpcService, "dispatcher" + UUID.randomUUID(), this.configuration, this.highAvailabilityServices, new TestingResourceManagerGateway(), this.blobServer, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, (ArchivedExecutionGraphStore)new MemoryArchivedExecutionGraphStore(), new TestingJobManagerRunnerFactory(new CompletableFuture<JobGraph>(), this.resultFuture, this.terminationFuture), this.fatalErrorHandler);
        this.dispatcher.start();
        this.dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)this.storedHABlobFuture.get(), (Matcher)Matchers.equalTo((Object)this.permanentBlobKey));
    }

    @After
    public void teardown() throws Exception {
        if (this.dispatcher != null) {
            this.dispatcher.shutDown();
            this.dispatcher.getTerminationFuture().get();
        }
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws ExecutionException, InterruptedException {
        if (rpcService != null) {
            rpcService.stopService().get();
        }
    }

    @Test
    public void testBlobServerCleanupWhenJobFinished() throws Exception {
        this.submitJob();
        this.resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build());
        this.terminationFuture.complete(null);
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)false));
    }

    private void submitJob() throws InterruptedException, ExecutionException {
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        submissionFuture.get();
    }

    @Test
    public void testBlobServerCleanupWhenJobNotFinished() throws Exception {
        this.submitJob();
        this.resultFuture.completeExceptionally((Throwable)new JobNotFinishedException(this.jobId));
        this.terminationFuture.complete(null);
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)false));
        try {
            this.deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not delete the HA blobs.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
        this.submitJob();
        this.dispatcher.shutDown();
        this.terminationFuture.complete(null);
        this.dispatcher.getTerminationFuture().get();
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)false));
        try {
            this.deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not delete the HA blobs.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testRunningJobsRegistryCleanup() throws Exception {
        this.submitJob();
        this.runningJobsRegistry.setJobRunning(this.jobId);
        Assert.assertThat((Object)this.runningJobsRegistry.contains(this.jobId), (Matcher)Matchers.is((Object)true));
        this.resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(this.jobId).build());
        this.terminationFuture.complete(null);
        this.clearedJobLatch.await();
        Assert.assertThat((Object)this.runningJobsRegistry.contains(this.jobId), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testJobSubmissionUnderSameJobId() throws Exception {
        this.submitJob();
        this.runningJobsRegistry.setJobRunning(this.jobId);
        this.resultFuture.completeExceptionally((Throwable)new JobNotFinishedException(this.jobId));
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        try {
            submissionFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"The job submission future should not complete until the previous JobManager termination future has been completed.");
        }
        catch (TimeoutException timeoutException) {
        }
        finally {
            this.terminationFuture.complete(null);
        }
        Assert.assertThat(submissionFuture.get(), (Matcher)Matchers.equalTo((Object)Acknowledge.get()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobRecoveryWithPendingTermination() throws Exception {
        this.submitJob();
        this.runningJobsRegistry.setJobRunning(this.jobId);
        this.dispatcherLeaderElectionService.notLeader();
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<UUID> leaderFuture = this.dispatcherLeaderElectionService.isLeader(leaderSessionId);
        try {
            leaderFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not become leader before all previously running JobMasters have terminated.");
        }
        catch (TimeoutException timeoutException) {
        }
        finally {
            this.terminationFuture.complete(null);
        }
        Assert.assertThat((Object)leaderFuture.get(), (Matcher)Matchers.equalTo((Object)leaderSessionId));
    }

    @Test
    public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception {
        this.submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future"));
        this.submitJob();
        ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.CANCELED).build();
        this.resultFuture.complete(executionGraph);
        this.terminationFuture.complete(null);
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception {
        this.submitJob();
        ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.CANCELED).build();
        this.resultFuture.complete(executionGraph);
        this.terminationFuture.complete(null);
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
    }

    private static final class TestingBlobServer
    extends BlobServer {
        private final CompletableFuture<JobID> cleanupJobFuture;

        public TestingBlobServer(Configuration config, BlobStore blobStore, CompletableFuture<JobID> cleanupJobFuture) throws IOException {
            super(config, blobStore);
            this.cleanupJobFuture = cleanupJobFuture;
        }

        public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
            boolean result = super.cleanupJob(jobId, cleanupBlobStoreFiles);
            this.cleanupJobFuture.complete(jobId);
            return result;
        }
    }

    private static final class SingleRunningJobsRegistry
    implements RunningJobsRegistry {
        @Nonnull
        private final JobID expectedJobId;
        @Nonnull
        private final OneShotLatch clearedJobLatch;
        private RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.PENDING;
        private boolean containsJob = false;

        private SingleRunningJobsRegistry(@Nonnull JobID expectedJobId, @Nonnull OneShotLatch clearedJobLatch) {
            this.expectedJobId = expectedJobId;
            this.clearedJobLatch = clearedJobLatch;
        }

        public void setJobRunning(JobID jobID) {
            this.checkJobId(jobID);
            this.containsJob = true;
            this.jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.RUNNING;
        }

        private void checkJobId(JobID jobID) {
            Preconditions.checkArgument((boolean)this.expectedJobId.equals((Object)jobID));
        }

        public void setJobFinished(JobID jobID) {
            this.checkJobId(jobID);
            this.containsJob = true;
            this.jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.DONE;
        }

        public RunningJobsRegistry.JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
            this.checkJobId(jobID);
            return this.jobSchedulingStatus;
        }

        public boolean contains(JobID jobId) {
            this.checkJobId(jobId);
            return this.containsJob;
        }

        public void clearJob(JobID jobID) {
            this.checkJobId(jobID);
            this.containsJob = false;
            this.clearedJobLatch.trigger();
        }
    }
}

