/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.InvalidActorNameException;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class TaskManagerRegistrationTest
extends TestLogger {
    private static ActorSystem actorSystem;
    private static Configuration config;
    private static FiniteDuration timeout;
    private TestingHighAvailabilityServices highAvailabilityServices;

    @BeforeClass
    public static void startActorSystem() {
        config = new Configuration();
        config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
        config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms");
        config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2 s");
        config.setInteger(AkkaOptions.WATCH_THRESHOLD, 2);
        actorSystem = AkkaUtils.createLocalActorSystem((Configuration)config);
    }

    @AfterClass
    public static void shutdownActorSystem() {
        if (actorSystem != null) {
            actorSystem.shutdown();
        }
    }

    @Before
    public void setupTest() {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
    }

    @After
    public void tearDownTest() throws Exception {
        this.highAvailabilityServices.closeAndCleanupAllData();
        this.highAvailabilityServices = null;
    }

    @Test
    public void testSimpleRegistration() throws Exception {
        new JavaTestKit(actorSystem){
            {
                ActorGateway jobManager = null;
                ActorGateway taskManager1 = null;
                ActorGateway taskManager2 = null;
                AkkaActorGateway resourceManager = null;
                EmbeddedHaServices embeddedHaServices = null;
                try {
                    embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
                    jobManager = TestingUtils.createJobManager(actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), config, (HighAvailabilityServices)embeddedHaServices);
                    resourceManager = new AkkaActorGateway(TaskManagerRegistrationTest.startResourceManager(config, (HighAvailabilityServices)embeddedHaServices), jobManager.leaderSessionID());
                    taskManager1 = TestingUtils.createTaskManager(actorSystem, (HighAvailabilityServices)embeddedHaServices, config, true, false);
                    taskManager2 = TestingUtils.createTaskManager(actorSystem, (HighAvailabilityServices)embeddedHaServices, config, true, false);
                    Future responseFuture1 = taskManager1.ask((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
                    Future responseFuture2 = taskManager2.ask((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
                    Object response1 = Await.result((Awaitable)responseFuture1, (Duration)timeout);
                    Object response2 = Await.result((Awaitable)responseFuture2, (Duration)timeout);
                    Assert.assertTrue((boolean)(response1 instanceof TaskManagerMessages.RegisteredAtJobManager));
                    Assert.assertTrue((boolean)(response2 instanceof TaskManagerMessages.RegisteredAtJobManager));
                    Future numTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout);
                    Integer count = (Integer)Await.result((Awaitable)numTaskManagersFuture, (Duration)timeout);
                    Assert.assertEquals((long)2L, (long)count.intValue());
                }
                catch (Exception e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, jobManager, resourceManager));
                        embeddedHaServices.closeAndCleanupAllData();
                        throw throwable;
                    }
                    TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, jobManager, resourceManager));
                    embeddedHaServices.closeAndCleanupAllData();
                }
                TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, jobManager, resourceManager));
                embeddedHaServices.closeAndCleanupAllData();
            }
        };
    }

    @Test
    public void testDelayedRegistration() throws Exception {
        new JavaTestKit(actorSystem){
            {
                ActorGateway jobManager = null;
                ActorGateway taskManager = null;
                FiniteDuration delayedTimeout = timeout.$times(3L);
                EmbeddedHaServices embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
                try {
                    taskManager = TestingUtils.createTaskManager(actorSystem, (HighAvailabilityServices)embeddedHaServices, new Configuration(), true, false);
                    Thread.sleep(6000L);
                    jobManager = TestingUtils.createJobManager(actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new Configuration(), (HighAvailabilityServices)embeddedHaServices);
                    Future responseFuture = taskManager.ask((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), delayedTimeout);
                    Object response = Await.result((Awaitable)responseFuture, (Duration)delayedTimeout);
                    Assert.assertTrue((boolean)(response instanceof TaskManagerMessages.RegisteredAtJobManager));
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager));
                    embeddedHaServices.closeAndCleanupAllData();
                    throw throwable;
                }
                TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager));
                embeddedHaServices.closeAndCleanupAllData();
            }
        };
    }

    @Test
    public void testShutdownAfterRegistrationDurationExpired() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway taskManager = null;
                try {
                    Configuration tmConfig = new Configuration();
                    tmConfig.setString("taskmanager.maxRegistrationDuration", "500 ms");
                    TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new TestingLeaderRetrievalService("foobar", HighAvailabilityServices.DEFAULT_LEADER_ID));
                    taskManager = TestingUtils.createTaskManager(actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, tmConfig, true, false);
                    this.watch(taskManager.actor());
                    final ActorGateway tm = taskManager;
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectTerminated(tm.actor());
                        }
                    };
                }
                catch (Throwable e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                    finally {
                        TestingUtils.stopActorGracefully(taskManager);
                    }
                }
                TestingUtils.stopActorGracefully(taskManager);
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterRefusedRegistration() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway jm = null;
                ActorGateway taskManager = null;
                try {
                    final ActorGateway jmGateway = jm = TestingUtils.createForwardingActor(actorSystem, this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, (Option<String>)Option.empty());
                    FiniteDuration refusedRegistrationPause = new FiniteDuration(500L, TimeUnit.MILLISECONDS);
                    Configuration tmConfig = new Configuration(config);
                    tmConfig.setString("taskmanager.refused-registration-pause", refusedRegistrationPause.toString());
                    TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new TestingLeaderRetrievalService(jm.path(), HighAvailabilityServices.DEFAULT_LEADER_ID));
                    final ActorGateway taskManagerGateway = taskManager = TestingUtils.createTaskManager(actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, tmConfig, true, false);
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            taskManagerGateway.tell((Object)new RegistrationMessages.RefuseRegistration((Throwable)new Exception("test reason")), jmGateway);
                        }
                    };
                    FiniteDuration maxDelay = (FiniteDuration)refusedRegistrationPause.$times(3.0);
                    new JavaTestKit.Within(maxDelay){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
                    throw throwable;
                }
                TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
            }
        };
    }

    @Test
    public void testTaskManagerNoExcessiveRegistrationMessages() throws Exception {
        new JavaTestKit(actorSystem){
            {
                ActorGateway jm = null;
                ActorGateway taskManager = null;
                try {
                    FiniteDuration timeout = new FiniteDuration(5L, TimeUnit.SECONDS);
                    jm = TestingUtils.createForwardingActor(actorSystem, this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, (Option<String>)Option.empty());
                    TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new TestingLeaderRetrievalService(jm.path(), HighAvailabilityServices.DEFAULT_LEADER_ID));
                    ActorGateway jmGateway = jm;
                    long refusedRegistrationPause = 500L;
                    long initialRegistrationPause = 100L;
                    long maxDelay = 30000L;
                    Configuration tmConfig = new Configuration(config);
                    tmConfig.setString("taskmanager.refused-registration-pause", refusedRegistrationPause + " ms");
                    tmConfig.setString("taskmanager.initial-registration-pause", initialRegistrationPause + " ms");
                    ActorGateway taskManagerGateway = taskManager = TestingUtils.createTaskManager(actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, tmConfig, true, false);
                    Deadline deadline = timeout.fromNow();
                    try {
                        while (deadline.hasTimeLeft()) {
                            this.expectMsgClass(deadline.timeLeft(), RegistrationMessages.RegisterTaskManager.class);
                            taskManagerGateway.tell((Object)new RegistrationMessages.RefuseRegistration((Throwable)new Exception("test reason")), jmGateway);
                        }
                    }
                    catch (AssertionError assertionError) {
                        // empty catch block
                    }
                    RegistrationMessages.RegisterTaskManager[] registerTaskManagerMessages = (RegistrationMessages.RegisterTaskManager[])new JavaTestKit.ReceiveWhile<RegistrationMessages.RegisterTaskManager>(RegistrationMessages.RegisterTaskManager.class, (Duration)timeout){

                        protected RegistrationMessages.RegisterTaskManager match(Object msg) throws Exception {
                            if (msg instanceof RegistrationMessages.RegisterTaskManager) {
                                return (RegistrationMessages.RegisterTaskManager)msg;
                            }
                            throw this.noMatch();
                        }
                    }.get();
                    int maxExponent = (int)Math.floor(Math.log((double)maxDelay / (double)initialRegistrationPause + 1.0) / Math.log(2.0));
                    int exponent = (int)Math.ceil(Math.log((double)timeout.toMillis() / (double)initialRegistrationPause + 1.0) / Math.log(2.0));
                    int exp = Math.min(maxExponent, exponent);
                    long difference = timeout.toMillis() - initialRegistrationPause * (long)(1 << exp);
                    int numberRegisterTaskManagerMessages = exp;
                    if (difference > 0L) {
                        numberRegisterTaskManagerMessages = (int)((double)numberRegisterTaskManagerMessages + Math.ceil((double)difference / (double)maxDelay));
                    }
                    int maxExpectedNumberOfRegisterTaskManagerMessages = numberRegisterTaskManagerMessages * 2;
                    Assert.assertTrue((String)("The number of RegisterTaskManager messages #" + registerTaskManagerMessages.length + " should be less than #" + maxExpectedNumberOfRegisterTaskManagerMessages), (registerTaskManagerMessages.length <= maxExpectedNumberOfRegisterTaskManagerMessages ? 1 : 0) != 0);
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
                    throw throwable;
                }
                TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterJobManagerFailure() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway fakeJobManager1Gateway = null;
                ActorGateway fakeJobManager2Gateway = null;
                ActorGateway taskManagerGateway = null;
                String JOB_MANAGER_NAME = "ForwardingJobManager";
                try {
                    final ActorGateway fakeJM1Gateway = fakeJobManager1Gateway = TestingUtils.createForwardingActor(actorSystem, this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, (Option<String>)Option.apply((Object)"ForwardingJobManager"));
                    TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(fakeJM1Gateway.path(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                    TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService);
                    final ActorGateway tm = taskManagerGateway = TestingUtils.createTaskManager(actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, config, true, false);
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            tm.tell((Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 45234), fakeJM1Gateway);
                        }
                    };
                    this.watch(fakeJobManager1Gateway.actor());
                    TestingUtils.stopActor(fakeJobManager1Gateway.actor());
                    final ActorGateway gateway = fakeJobManager1Gateway;
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            Object message = null;
                            while (message == null || !(message instanceof Terminated)) {
                                message = this.receiveOne((Duration)timeout);
                            }
                            Terminated terminatedMessage = (Terminated)message;
                            Assert.assertEquals((Object)gateway.actor(), (Object)terminatedMessage.actor());
                        }
                    };
                    fakeJobManager1Gateway = null;
                    long deadline = 20000000000L + System.nanoTime();
                    do {
                        try {
                            fakeJobManager2Gateway = TestingUtils.createForwardingActor(actorSystem, this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, (Option<String>)Option.apply((Object)"ForwardingJobManager"));
                        }
                        catch (InvalidActorNameException e) {
                            Thread.sleep(100L);
                        }
                    } while (fakeJobManager2Gateway == null && System.nanoTime() < deadline);
                    final ActorGateway fakeJM2GatewayClosure = fakeJobManager2Gateway;
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            tm.tell((Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 45234), fakeJM2GatewayClosure);
                        }
                    };
                }
                catch (Throwable e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, fakeJobManager2Gateway));
                        throw throwable;
                    }
                    TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, fakeJobManager2Gateway));
                }
                TestingUtils.stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, fakeJobManager2Gateway));
            }
        };
    }

    @Test
    public void testCheckForValidRegistrationSessionIDs() throws IOException {
        new JavaTestKit(actorSystem){
            {
                ActorGateway taskManagerGateway = null;
                final UUID falseLeaderSessionID = UUID.randomUUID();
                final UUID trueLeaderSessionID = UUID.randomUUID();
                HighAvailabilityServices mockedHighAvailabilityServices = (HighAvailabilityServices)Mockito.mock(HighAvailabilityServices.class);
                Mockito.when((Object)mockedHighAvailabilityServices.getJobManagerLeaderRetriever((JobID)Matchers.eq((Object)HighAvailabilityServices.DEFAULT_JOB_ID))).thenReturn((Object)new StandaloneLeaderRetrievalService(this.getTestActor().path().toString(), trueLeaderSessionID));
                Mockito.when((Object)mockedHighAvailabilityServices.createBlobStore()).thenReturn((Object)new VoidBlobStore());
                try {
                    taskManagerGateway = TestingUtils.createTaskManager(actorSystem, mockedHighAvailabilityServices, config, true, false);
                    final ActorRef taskManager = taskManagerGateway.actor();
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            taskManager.tell((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), this.getTestActor());
                            JobManagerMessages.LeaderSessionMessage lsm = (JobManagerMessages.LeaderSessionMessage)this.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
                            Assert.assertTrue((boolean)lsm.leaderSessionID().equals(trueLeaderSessionID));
                            Assert.assertTrue((boolean)(lsm.message() instanceof RegistrationMessages.RegisterTaskManager));
                            ActorRef tm = this.getLastSender();
                            tm.tell((Object)new JobManagerMessages.LeaderSessionMessage(falseLeaderSessionID, (Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 1)), this.getTestActor());
                            tm.tell((Object)new JobManagerMessages.LeaderSessionMessage(trueLeaderSessionID, (Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 1)), this.getTestActor());
                            Object message = null;
                            while (!(message instanceof TaskManagerMessages.RegisteredAtJobManager)) {
                                message = this.receiveOne((Duration)TestingUtils.TESTING_DURATION());
                            }
                            tm.tell(JobManagerMessages.getRequestLeaderSessionID(), this.getTestActor());
                            this.expectMsgEquals(new JobManagerMessages.ResponseLeaderSessionID(trueLeaderSessionID));
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActorGracefully(taskManagerGateway);
                    throw throwable;
                }
                TestingUtils.stopActorGracefully(taskManagerGateway);
            }
        };
    }

    private static ActorRef startResourceManager(Configuration config, HighAvailabilityServices highAvailabilityServices) {
        return FlinkResourceManager.startResourceManagerActors((Configuration)config, (ActorSystem)actorSystem, (LeaderRetrievalService)highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), StandaloneResourceManager.class);
    }

    static {
        timeout = new FiniteDuration(20L, TimeUnit.SECONDS);
    }
}

