/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class ResourceManagerTaskExecutorTest
extends TestLogger {
    private final Time timeout = Time.seconds((long)10L);
    private TestingRpcService rpcService;
    private SlotReport slotReport = new SlotReport();
    private static String taskExecutorAddress = "/taskExecutor1";
    private ResourceID taskExecutorResourceID;
    private ResourceID resourceManagerResourceID;
    private StandaloneResourceManager resourceManager;
    private ResourceManagerGateway rmGateway;
    private ResourceManagerGateway wronglyFencedGateway;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    @Before
    public void setup() throws Exception {
        this.rpcService = new TestingRpcService();
        this.taskExecutorResourceID = this.mockTaskExecutor(taskExecutorAddress);
        this.resourceManagerResourceID = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
        this.resourceManager = this.createAndStartResourceManager(rmLeaderElectionService, this.testingFatalErrorHandler);
        this.rmGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        this.wronglyFencedGateway = this.rpcService.connect(this.resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.grantLeadership(rmLeaderElectionService).get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @After
    public void teardown() throws Exception {
        this.rpcService.stopService();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterTaskExecutor() throws Exception {
        try {
            CompletableFuture successfulFuture = this.rmGateway.registerTaskExecutor(taskExecutorAddress, this.taskExecutorResourceID, this.slotReport, this.timeout);
            RegistrationResponse response = (RegistrationResponse)successfulFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)(response instanceof TaskExecutorRegistrationSuccess));
            CompletableFuture duplicateFuture = this.rmGateway.registerTaskExecutor(taskExecutorAddress, this.taskExecutorResourceID, this.slotReport, this.timeout);
            RegistrationResponse duplicateResponse = (RegistrationResponse)duplicateFuture.get();
            Assert.assertTrue((boolean)(duplicateResponse instanceof TaskExecutorRegistrationSuccess));
            Assert.assertNotEquals((Object)((TaskExecutorRegistrationSuccess)response).getRegistrationId(), (Object)((TaskExecutorRegistrationSuccess)duplicateResponse).getRegistrationId());
        }
        finally {
            if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
                this.testingFatalErrorHandler.rethrowError();
            }
        }
    }

    @Test
    public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
        try {
            CompletableFuture unMatchedLeaderFuture = this.wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, this.taskExecutorResourceID, this.slotReport, this.timeout);
            try {
                unMatchedLeaderFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail((String)"Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
            }
        }
        finally {
            if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
                this.testingFatalErrorHandler.rethrowError();
            }
        }
    }

    @Test
    public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
        try {
            String invalidAddress = "/taskExecutor2";
            CompletableFuture invalidAddressFuture = this.rmGateway.registerTaskExecutor(invalidAddress, this.taskExecutorResourceID, this.slotReport, this.timeout);
            Assert.assertTrue((boolean)(invalidAddressFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline));
        }
        finally {
            if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
                this.testingFatalErrorHandler.rethrowError();
            }
        }
    }

    private ResourceID mockTaskExecutor(String taskExecutorAddress) {
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        ResourceID taskExecutorResourceID = ResourceID.generate();
        this.rpcService.registerGateway(taskExecutorAddress, (RpcGateway)taskExecutorGateway);
        return taskExecutorResourceID;
    }

    private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L);
        highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
        ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds((long)5L), Time.seconds((long)5L));
        SlotManager slotManager = new SlotManager(this.rpcService.getScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        MetricRegistryImpl metricRegistry = (MetricRegistryImpl)Mockito.mock(MetricRegistryImpl.class);
        JobLeaderIdService jobLeaderIdService = new JobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, this.rpcService.getScheduledExecutor(), Time.minutes((long)5L));
        StandaloneResourceManager resourceManager = new StandaloneResourceManager((RpcService)this.rpcService, "resourcemanager", this.resourceManagerResourceID, resourceManagerConfiguration, (HighAvailabilityServices)highAvailabilityServices, heartbeatServices, slotManager, (MetricRegistry)metricRegistry, jobLeaderIdService, fatalErrorHandler);
        resourceManager.start();
        return resourceManager;
    }

    private CompletableFuture<UUID> grantLeadership(TestingLeaderElectionService leaderElectionService) {
        UUID leaderSessionId = UUID.randomUUID();
        return leaderElectionService.isLeader(leaderSessionId);
    }
}

