/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerCleanupITCase
extends TestLogger {
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)system);
    }

    @Test
    public void testBlobServerCleanupFinishedJob() throws IOException {
        this.testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
    }

    @Test
    public void testBlobServerCleanupCancelledJob() throws IOException {
        this.testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
    }

    @Test
    public void testBlobServerCleanupFailedJob() throws IOException {
        this.testBlobServerCleanup(TestCase.JOB_FAILS);
    }

    @Test
    public void testBlobServerCleanupFailedSubmission() throws IOException {
        this.testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
    }

    private void testBlobServerCleanup(final TestCase testCase) throws IOException {
        int num_tasks = 2;
        final File blobBaseDir = this.tmpFolder.newFolder();
        new JavaTestKit(system){
            {
                super(x0);
                new JavaTestKit.Within(1.duration((String)"30 seconds")){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    protected void run() {
                        TestingCluster cluster = null;
                        File tempBlob = null;
                        try {
                            Configuration config = new Configuration();
                            config.setInteger("taskmanager.numberOfTaskSlots", 2);
                            config.setInteger("local.number-taskmanager", 1);
                            config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
                            config.setString("restart-strategy", "fixeddelay");
                            config.setInteger("restart-strategy.fixed-delay.attempts", 1);
                            config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1 s");
                            config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
                            cluster = new TestingCluster(config);
                            cluster.start();
                            ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            JobVertex source = new JobVertex("Source");
                            if (testCase == TestCase.JOB_FAILS || testCase == TestCase.JOB_IS_CANCELLED) {
                                source.setInvokableClass(FailingBlockingInvokable.class);
                            } else {
                                source.setInvokableClass(NoOpInvokable.class);
                            }
                            source.setParallelism(2);
                            JobGraph jobGraph = new JobGraph("BlobCleanupTest", new JobVertex[]{source});
                            JobID jid = jobGraph.getJobID();
                            Future future = jobManagerGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), this.remaining());
                            int blobPort = (Integer)Await.result((Awaitable)future, (Duration)this.remaining());
                            tempBlob = File.createTempFile("Required", ".jar");
                            List keys = BlobClient.uploadJarFiles((InetSocketAddress)new InetSocketAddress("localhost", blobPort), (Configuration)config, (JobID)jid, Collections.singletonList(new Path(tempBlob.getAbsolutePath())));
                            Assert.assertEquals((long)1L, (long)keys.size());
                            jobGraph.addBlob((PermanentBlobKey)keys.get(0));
                            if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
                                jobGraph.addBlob(new PermanentBlobKey());
                            }
                            jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, testCase == TestCase.JOB_IS_CANCELLED ? ListeningBehaviour.DETACHED : ListeningBehaviour.EXECUTION_RESULT), (ActorGateway)testActorGateway);
                            if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
                                this.expectMsgClass(JobManagerMessages.JobResultFailure.class);
                            } else {
                                this.expectMsgEquals(new JobManagerMessages.JobSubmitSuccess(jid));
                                if (testCase == TestCase.JOB_FAILS) {
                                    FailingBlockingInvokable.unblock();
                                    this.expectMsgClass(JobManagerMessages.JobResultFailure.class);
                                } else if (testCase == TestCase.JOB_IS_CANCELLED) {
                                    jobManagerGateway.tell((Object)new JobManagerMessages.CancelJob(jid), (ActorGateway)testActorGateway);
                                    this.expectMsgEquals(new JobManagerMessages.CancellationSuccess(jid, null));
                                } else {
                                    this.expectMsgClass(JobManagerMessages.JobResultSuccess.class);
                                }
                            }
                            File[] blobDirs = blobBaseDir.listFiles(new FilenameFilter(){

                                @Override
                                public boolean accept(File dir, String name) {
                                    return name.startsWith("blobStore-");
                                }
                            });
                            Assert.assertNotNull((Object)blobDirs);
                            for (File blobDir : blobDirs) {
                                JobManagerCleanupITCase.waitForEmptyBlobDir(blobDir, this.remaining());
                            }
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail((String)e.getMessage());
                        }
                        finally {
                            if (cluster != null) {
                                cluster.stop();
                            }
                            if (tempBlob != null) {
                                Assert.assertTrue((boolean)tempBlob.delete());
                            }
                        }
                    }
                };
            }
        };
        Assert.assertArrayEquals((Object[])new File[0], (Object[])blobBaseDir.listFiles());
    }

    private static void waitForEmptyBlobDir(File blobDir, FiniteDuration remaining) throws InterruptedException {
        Object[] blobDirContents;
        long deadline = System.currentTimeMillis() + remaining.toMillis();
        do {
            if ((blobDirContents = blobDir.list(new FilenameFilter(){

                @Override
                public boolean accept(File dir, String name) {
                    return name.startsWith("job_");
                }
            })) == null || blobDirContents.length == 0) {
                return;
            }
            Thread.sleep(100L);
        } while (System.currentTimeMillis() < deadline);
        Assert.fail((String)("Timeout while waiting for " + blobDir.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(blobDirContents)));
    }

    private static enum TestCase {
        JOB_FINISHES_SUCESSFULLY,
        JOB_IS_CANCELLED,
        JOB_FAILS,
        JOB_SUBMISSION_FAILS;

    }
}

