/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.client.JobAttachmentClientActor;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException;
import org.apache.flink.runtime.client.JobClientActorRegistrationTimeoutException;
import org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.client.JobSubmissionClientActor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobClientActorTest
extends TestLogger {
    private static ActorSystem system;
    private static JobGraph testJobGraph;
    private static Configuration clientConfig;

    @BeforeClass
    public static void setup() {
        clientConfig = new Configuration();
        system = AkkaUtils.createLocalActorSystem((Configuration)clientConfig);
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)system);
        system = null;
    }

    @Test(expected=JobClientActorSubmissionTimeoutException.class)
    public void testSubmissionTimeout() throws Exception {
        FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration timeout = jobClientActorTimeout.$times(2L);
        UUID leaderSessionID = UUID.randomUUID();
        ActorRef jobManager = system.actorOf(Props.create(PlainActor.class, (Object[])new Object[]{leaderSessionID}));
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(jobManager.path().toString(), leaderSessionID);
        Props jobClientActorProps = JobSubmissionClientActor.createActorProps((LeaderRetrievalService)testingLeaderRetrievalService, (FiniteDuration)jobClientActorTimeout, (boolean)false, (Configuration)clientConfig);
        ActorRef jobClientActor = system.actorOf(jobClientActorProps);
        Future jobExecutionResult = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.SubmitJobAndWait(testJobGraph), (Timeout)new Timeout(timeout));
        Await.result((Awaitable)jobExecutionResult, (Duration)timeout);
    }

    @Test(expected=JobClientActorRegistrationTimeoutException.class)
    public void testRegistrationTimeout() throws Exception {
        FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration timeout = jobClientActorTimeout.$times(2L);
        UUID leaderSessionID = UUID.randomUUID();
        ActorRef jobManager = system.actorOf(Props.create(PlainActor.class, (Object[])new Object[]{leaderSessionID}));
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(jobManager.path().toString(), leaderSessionID);
        Props jobClientActorProps = JobAttachmentClientActor.createActorProps((LeaderRetrievalService)testingLeaderRetrievalService, (FiniteDuration)jobClientActorTimeout, (boolean)false);
        ActorRef jobClientActor = system.actorOf(jobClientActorProps);
        Future jobExecutionResult = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), (Timeout)new Timeout(timeout));
        Await.result((Awaitable)jobExecutionResult, (Duration)timeout);
    }

    @Test(expected=JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception {
        FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration timeout = jobClientActorTimeout.$times(2L);
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
        Props jobClientActorProps = JobSubmissionClientActor.createActorProps((LeaderRetrievalService)testingLeaderRetrievalService, (FiniteDuration)jobClientActorTimeout, (boolean)false, (Configuration)clientConfig);
        ActorRef jobClientActor = system.actorOf(jobClientActorProps);
        Future jobExecutionResult = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.SubmitJobAndWait(testJobGraph), (Timeout)new Timeout(timeout));
        Await.result((Awaitable)jobExecutionResult, (Duration)timeout);
    }

    @Test(expected=JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception {
        FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration timeout = jobClientActorTimeout.$times(2L);
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
        Props jobClientActorProps = JobAttachmentClientActor.createActorProps((LeaderRetrievalService)testingLeaderRetrievalService, (FiniteDuration)jobClientActorTimeout, (boolean)false);
        ActorRef jobClientActor = system.actorOf(jobClientActorProps);
        Future jobExecutionResult = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), (Timeout)new Timeout(timeout));
        Await.result((Awaitable)jobExecutionResult, (Duration)timeout);
    }

    @Test(expected=JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutAfterJobSubmission() throws Exception {
        FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration timeout = jobClientActorTimeout.$times(2L);
        UUID leaderSessionID = UUID.randomUUID();
        ActorRef jobManager = system.actorOf(Props.create(JobAcceptingActor.class, (Object[])new Object[]{leaderSessionID}));
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(jobManager.path().toString(), leaderSessionID);
        Props jobClientActorProps = JobSubmissionClientActor.createActorProps((LeaderRetrievalService)testingLeaderRetrievalService, (FiniteDuration)jobClientActorTimeout, (boolean)false, (Configuration)clientConfig);
        ActorRef jobClientActor = system.actorOf(jobClientActorProps);
        Future jobExecutionResult = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.SubmitJobAndWait(testJobGraph), (Timeout)new Timeout(timeout));
        Future waitFuture = Patterns.ask((ActorRef)jobManager, (Object)new RegisterTest(), (Timeout)new Timeout(timeout));
        Await.result((Awaitable)waitFuture, (Duration)timeout);
        jobManager.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
        Await.result((Awaitable)jobExecutionResult, (Duration)timeout);
    }

    @Test(expected=JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutAfterJobRegistration() throws Exception {
        FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration timeout = jobClientActorTimeout.$times(2L);
        UUID leaderSessionID = UUID.randomUUID();
        ActorRef jobManager = system.actorOf(Props.create(JobAcceptingActor.class, (Object[])new Object[]{leaderSessionID}));
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(jobManager.path().toString(), leaderSessionID);
        Props jobClientActorProps = JobAttachmentClientActor.createActorProps((LeaderRetrievalService)testingLeaderRetrievalService, (FiniteDuration)jobClientActorTimeout, (boolean)false);
        ActorRef jobClientActor = system.actorOf(jobClientActorProps);
        Future jobExecutionResult = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), (Timeout)new Timeout(timeout));
        Future waitFuture = Patterns.ask((ActorRef)jobManager, (Object)new RegisterTest(), (Timeout)new Timeout(timeout));
        Await.result((Awaitable)waitFuture, (Duration)timeout);
        jobManager.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
        Await.result((Awaitable)jobExecutionResult, (Duration)timeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGuaranteedAnswerIfJobClientDies() throws Exception {
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.SECONDS);
        UUID leaderSessionID = UUID.randomUUID();
        ActorRef jobManager = system.actorOf(Props.create(JobAcceptingActor.class, (Object[])new Object[]{leaderSessionID}));
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(jobManager.path().toString(), leaderSessionID);
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService);
        JobListeningContext jobListeningContext = JobClient.submitJob((ActorSystem)system, (Configuration)clientConfig, (HighAvailabilityServices)highAvailabilityServices, (JobGraph)testJobGraph, (FiniteDuration)timeout, (boolean)false, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
        Future waitFuture = Patterns.ask((ActorRef)jobManager, (Object)new RegisterTest(), (Timeout)new Timeout(timeout));
        Await.result((Awaitable)waitFuture, (Duration)timeout);
        jobListeningContext.getJobClientActor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
        try {
            JobClient.awaitJobResult((JobListeningContext)jobListeningContext);
            Assert.fail();
        }
        catch (JobExecutionException jobExecutionException) {
        }
        finally {
            highAvailabilityServices.closeAndCleanupAllData();
        }
    }

    static {
        testJobGraph = new JobGraph("Test Job");
    }

    public static class RegisterTest {
    }

    public static class JobAcceptingActor
    extends FlinkUntypedActor {
        private final UUID leaderSessionID;
        private boolean jobAccepted = false;
        private ActorRef testFuture = ActorRef.noSender();

        public JobAcceptingActor(UUID leaderSessionID) {
            this.leaderSessionID = leaderSessionID;
        }

        protected void handleMessage(Object message) throws Exception {
            if (message instanceof JobManagerMessages.SubmitJob) {
                this.getSender().tell((Object)new JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob)message).jobGraph().getJobID()), this.getSelf());
                this.jobAccepted = true;
                if (this.testFuture != ActorRef.noSender()) {
                    this.testFuture.tell((Object)Acknowledge.get(), this.getSelf());
                }
            } else if (message instanceof JobManagerMessages.RegisterJobClient) {
                this.getSender().tell((Object)new JobManagerMessages.RegisterJobClientSuccess(((JobManagerMessages.RegisterJobClient)message).jobID()), this.getSelf());
                this.jobAccepted = true;
                if (this.testFuture != ActorRef.noSender()) {
                    this.testFuture.tell((Object)Acknowledge.get(), this.getSelf());
                }
            } else if (message instanceof JobManagerMessages.RequestBlobManagerPort$) {
                this.getSender().tell((Object)1337, this.getSelf());
            } else if (message instanceof RegisterTest) {
                this.testFuture = this.getSender();
                if (this.jobAccepted) {
                    this.testFuture.tell((Object)Acknowledge.get(), this.getSelf());
                }
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }
    }

    public static class PlainActor
    extends FlinkUntypedActor {
        private final UUID leaderSessionID;

        public PlainActor(UUID leaderSessionID) {
            this.leaderSessionID = leaderSessionID;
        }

        protected void handleMessage(Object message) throws Exception {
            if (message instanceof JobManagerMessages.RequestBlobManagerPort$) {
                this.getSender().tell((Object)1337, this.getSelf());
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }
    }
}

