/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ExecutionGraphCacheTest
extends TestLogger {
    @Test
    public void testExecutionGraphCaching() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        JobID jobId = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway)Mockito.mock(JobManagerGateway.class);
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
        try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, accessExecutionGraphFuture.get());
            CompletableFuture accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, accessExecutionGraphFuture2.get());
            ((JobManagerGateway)Mockito.verify((Object)jobManagerGateway, (VerificationMode)Mockito.times((int)1))).requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class));
        }
    }

    @Test
    public void testExecutionGraphEntryInvalidation() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.milliseconds((long)1L);
        JobID jobId = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway)Mockito.mock(JobManagerGateway.class);
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
        try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, executionGraphFuture.get());
            Thread.sleep(timeToLive.toMilliseconds());
            CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, executionGraphFuture2.get());
            ((JobManagerGateway)Mockito.verify((Object)jobManagerGateway, (VerificationMode)Mockito.times((int)2))).requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class));
        }
    }

    @Test
    public void testImmediateCacheInvalidationAfterFailure() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        JobID jobId = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway)Mockito.mock(JobManagerGateway.class);
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class))).thenReturn((Object)FutureUtils.completedExceptionally((Throwable)new FlinkJobNotFoundException(jobId)), (Object[])new CompletableFuture[]{CompletableFuture.completedFuture(accessExecutionGraph)});
        try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            try {
                executionGraphFuture.get();
                Assert.fail((String)"The execution graph future should have been completed exceptionally.");
            }
            catch (ExecutionException ee) {
                Assert.assertTrue((boolean)(ee.getCause() instanceof FlinkException));
            }
            CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, executionGraphFuture2.get());
        }
    }

    @Test
    public void testCacheEntryCleanup() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.milliseconds((long)1L);
        JobID jobId1 = new JobID();
        JobID jobId2 = new JobID();
        AccessExecutionGraph accessExecutionGraph1 = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        AccessExecutionGraph accessExecutionGraph2 = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway)Mockito.mock(JobManagerGateway.class);
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId1), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph1));
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId2), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph2));
        try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraph1Future = executionGraphCache.getExecutionGraph(jobId1, (RestfulGateway)jobManagerGateway);
            CompletableFuture executionGraph2Future = executionGraphCache.getExecutionGraph(jobId2, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph1, executionGraph1Future.get());
            Assert.assertEquals((Object)accessExecutionGraph2, executionGraph2Future.get());
            ((JobManagerGateway)Mockito.verify((Object)jobManagerGateway, (VerificationMode)Mockito.times((int)1))).requestJob((JobID)Matchers.eq((Object)jobId1), (Time)Matchers.any(Time.class));
            ((JobManagerGateway)Mockito.verify((Object)jobManagerGateway, (VerificationMode)Mockito.times((int)1))).requestJob((JobID)Matchers.eq((Object)jobId2), (Time)Matchers.any(Time.class));
            Thread.sleep(timeToLive.toMilliseconds());
            executionGraphCache.cleanup();
            Assert.assertTrue((executionGraphCache.size() == 0 ? 1 : 0) != 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAccess() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        JobID jobId = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway)Mockito.mock(JobManagerGateway.class);
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
        int numConcurrentAccesses = 10;
        ArrayList<CompletionStage> executionGraphFutures = new ArrayList<CompletionStage>(10);
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive);){
            for (int i = 0; i < 10; ++i) {
                CompletionStage executionGraphFuture = CompletableFuture.supplyAsync(() -> executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway), executor).thenCompose(Function.identity());
                executionGraphFutures.add(executionGraphFuture);
            }
            FutureUtils.ConjunctFuture allExecutionGraphFutures = FutureUtils.combineAll(executionGraphFutures);
            Collection allExecutionGraphs = (Collection)allExecutionGraphFutures.get();
            for (AccessExecutionGraph executionGraph : allExecutionGraphs) {
                Assert.assertEquals((Object)accessExecutionGraph, (Object)executionGraph);
            }
            ((JobManagerGateway)Mockito.verify((Object)jobManagerGateway, (VerificationMode)Mockito.times((int)1))).requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class));
        }
        catch (Throwable throwable) {
            ExecutorUtils.gracefulShutdown((long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{executor});
            throw throwable;
        }
        ExecutorUtils.gracefulShutdown((long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{executor});
    }

    @Test
    public void testCacheInvalidationIfSuspended() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        JobID jobId = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        AccessExecutionGraph suspendedExecutionGraph = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        Mockito.when((Object)suspendedExecutionGraph.getState()).thenReturn((Object)JobStatus.SUSPENDED);
        JobManagerGateway jobManagerGateway = (JobManagerGateway)Mockito.mock(JobManagerGateway.class);
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(suspendedExecutionGraph), (Object[])new CompletableFuture[]{CompletableFuture.completedFuture(accessExecutionGraph)});
        try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)suspendedExecutionGraph, executionGraphFuture.get());
            CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, executionGraphFuture2.get());
        }
    }

    @Test
    public void testCacheInvalidationIfSwitchToSuspended() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        JobID jobId = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph)Mockito.mock(AccessExecutionGraph.class);
        SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(jobId);
        JobManagerGateway jobManagerGateway = (JobManagerGateway)Mockito.mock(JobManagerGateway.class);
        Mockito.when((Object)jobManagerGateway.requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), (Object[])new CompletableFuture[]{CompletableFuture.completedFuture(accessExecutionGraph)});
        try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)((Object)toBeSuspendedExecutionGraph), executionGraphFuture.get());
            toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED);
            CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, executionGraphFuture2.get());
            CompletableFuture executionGraphFuture3 = executionGraphCache.getExecutionGraph(jobId, (RestfulGateway)jobManagerGateway);
            Assert.assertEquals((Object)accessExecutionGraph, executionGraphFuture3.get());
            ((JobManagerGateway)Mockito.verify((Object)jobManagerGateway, (VerificationMode)Mockito.times((int)2))).requestJob((JobID)Matchers.eq((Object)jobId), (Time)Matchers.any(Time.class));
        }
    }

    private static final class SuspendableAccessExecutionGraph
    extends ArchivedExecutionGraph {
        private static final long serialVersionUID = -6796543726305778101L;
        private JobStatus jobStatus = super.getState();

        public SuspendableAccessExecutionGraph(JobID jobId) {
            super(jobId, "ExecutionGraphCacheTest", Collections.emptyMap(), Collections.emptyList(), new long[0], JobStatus.RUNNING, new ErrorInfo((Throwable)new FlinkException("Test"), 42L), "", new StringifiedAccumulatorResult[0], Collections.emptyMap(), new ArchivedExecutionConfig(new ExecutionConfig()), false, null, null);
        }

        public JobStatus getState() {
            return this.jobStatus;
        }

        public void setJobStatus(JobStatus jobStatus) {
            this.jobStatus = jobStatus;
        }
    }
}

