/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.testkit.CallingThreadDispatcher;
import akka.testkit.JavaTestKit;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.Int;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;

public class JobManagerHARecoveryTest
extends TestLogger {
    private static ActorSystem system;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)system);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobRecoveryWhenLosingLeadership() throws Exception {
        FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
        FiniteDuration jobRecoveryTimeout = new FiniteDuration(3L, TimeUnit.SECONDS);
        Deadline deadline = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        Configuration flinkConfiguration = new Configuration();
        UUID leaderSessionID = UUID.randomUUID();
        UUID newLeaderSessionID = UUID.randomUUID();
        int slots = 2;
        ActorRef archive = null;
        ActorRef jobManager = null;
        ActorRef taskManager = null;
        flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.newFolder().toString());
        flinkConfiguration.setInteger("taskmanager.numberOfTaskSlots", slots);
        flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3600L);
        try {
            long[] recoveredStates;
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
            MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
            RecoverableCompletedCheckpointStore checkpointStore = new RecoverableCompletedCheckpointStore();
            StandaloneCheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
            MyCheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, (CheckpointIDCounter)checkpointCounter);
            TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
            TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
            TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
            testingHighAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, myLeaderRetrievalService);
            InstanceManager instanceManager = new InstanceManager();
            instanceManager.addInstanceListener((InstanceListener)scheduler);
            archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, (int)10, (Option)Option.empty()));
            BlobServer blobServer = new BlobServer(flinkConfiguration, testingHighAvailabilityServices.createBlobStore());
            blobServer.start();
            Props jobManagerProps = Props.create(TestingJobManager.class, (Object[])new Object[]{flinkConfiguration, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), instanceManager, scheduler, blobServer, new BlobLibraryCacheManager((PermanentBlobService)blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100L), timeout, myLeaderElectionService, mySubmittedJobGraphStore, checkpointStateFactory, jobRecoveryTimeout, new JobManagerMetricGroup((MetricRegistry)new NoOpMetricRegistry(), "localhost"), Option.empty()});
            jobManager = system.actorOf(jobManagerProps);
            AkkaActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
            taskManager = TaskManager.startTaskManagerComponentsAndActor((Configuration)flinkConfiguration, (ResourceID)ResourceID.generate(), (ActorSystem)system, (HighAvailabilityServices)testingHighAvailabilityServices, (MetricRegistry)new NoOpMetricRegistry(), (String)"localhost", (Option)Option.apply((Object)"taskmanager"), (boolean)true, TestingTaskManager.class);
            AkkaActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
            Future tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
            Await.ready((Awaitable)tmAlive, (Duration)deadline.timeLeft());
            JobVertex sourceJobVertex = new JobVertex("Source");
            sourceJobVertex.setInvokableClass(BlockingStatefulInvokable.class);
            sourceJobVertex.setParallelism(slots);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{sourceJobVertex});
            List<JobVertexID> vertexId = Collections.singletonList(sourceJobVertex.getID());
            jobGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexId, vertexId, vertexId, new CheckpointCoordinatorConfiguration(100L, 600000L, 0L, 1, ExternalizedCheckpointSettings.none(), true), null));
            BlockingStatefulInvokable.initializeStaticHelpers(slots);
            Future isLeader = gateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), deadline.timeLeft());
            Future isConnectedToJobManager = tmGateway.ask((Object)new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager), deadline.timeLeft());
            myLeaderElectionService.isLeader(leaderSessionID);
            myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID);
            Await.ready((Awaitable)isLeader, (Duration)deadline.timeLeft());
            Await.ready((Awaitable)isConnectedToJobManager, (Duration)deadline.timeLeft());
            Future jobSubmitted = gateway.ask((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), deadline.timeLeft());
            Await.ready((Awaitable)jobSubmitted, (Duration)deadline.timeLeft());
            BlockingStatefulInvokable.awaitCompletedCheckpoints();
            Future jobRemoved = gateway.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
            myLeaderElectionService.notLeader();
            Await.ready((Awaitable)jobRemoved, (Duration)deadline.timeLeft());
            Assert.assertTrue((boolean)mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
            Future jobRunning = gateway.ask((Object)new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
            myLeaderElectionService.isLeader(newLeaderSessionID);
            myLeaderRetrievalService.notifyListener(gateway.path(), newLeaderSessionID);
            Await.ready((Awaitable)jobRunning, (Duration)deadline.timeLeft());
            Future jobFinished = gateway.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
            BlockingInvokable.unblock();
            Await.ready((Awaitable)jobFinished, (Duration)deadline.timeLeft());
            Assert.assertFalse((boolean)mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
            for (long state : recoveredStates = BlockingStatefulInvokable.getRecoveredStates()) {
                boolean isExpected = state >= 5L;
                Assert.assertTrue((String)("Did not recover checkpoint state correctly, expecting >= 5, but state was " + state), (boolean)isExpected);
            }
        }
        finally {
            if (archive != null) {
                archive.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (jobManager != null) {
                jobManager.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (taskManager != null) {
                taskManager.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailingJobRecovery() throws Exception {
        FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        FiniteDuration jobRecoveryTimeout = new FiniteDuration(0L, TimeUnit.SECONDS);
        Deadline deadline = new FiniteDuration(1L, TimeUnit.MINUTES).fromNow();
        Configuration flinkConfiguration = new Configuration();
        UUID leaderSessionID = UUID.randomUUID();
        ActorRef jobManager = null;
        JobID jobId1 = new JobID();
        JobID jobId2 = new JobID();
        flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        try {
            SubmittedJobGraphStore submittedJobGraphStore = (SubmittedJobGraphStore)Mockito.mock(SubmittedJobGraphStore.class);
            SubmittedJobGraph submittedJobGraph = (SubmittedJobGraph)Mockito.mock(SubmittedJobGraph.class);
            Mockito.when((Object)submittedJobGraph.getJobId()).thenReturn((Object)jobId2);
            Mockito.when((Object)submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2));
            Mockito.when((Object)submittedJobGraphStore.recoverJobGraph((JobID)Matchers.eq((Object)jobId1))).thenThrow(new Throwable[]{new Exception("Test exception")});
            Mockito.when((Object)submittedJobGraphStore.recoverJobGraph((JobID)Matchers.eq((Object)jobId2))).thenReturn((Object)submittedJobGraph);
            TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
            ArrayList recoveredJobs = new ArrayList(2);
            BlobServer blobServer = (BlobServer)Mockito.mock(BlobServer.class);
            Props jobManagerProps = Props.create(TestingFailingHAJobManager.class, (Object[])new Object[]{flinkConfiguration, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), Mockito.mock(InstanceManager.class), Mockito.mock(Scheduler.class), blobServer, new BlobLibraryCacheManager((PermanentBlobService)blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]), ActorRef.noSender(), new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100L), timeout, myLeaderElectionService, submittedJobGraphStore, Mockito.mock(CheckpointRecoveryFactory.class), jobRecoveryTimeout, new JobManagerMetricGroup((MetricRegistry)new NoOpMetricRegistry(), "localhost"), recoveredJobs}).withDispatcher(CallingThreadDispatcher.Id());
            jobManager = system.actorOf(jobManagerProps);
            Future started = Patterns.ask((ActorRef)jobManager, (Object)new Identify((Object)42), (long)deadline.timeLeft().toMillis());
            Await.ready((Awaitable)started, (Duration)deadline.timeLeft());
            myLeaderElectionService.isLeader(leaderSessionID);
            Assert.assertThat(recoveredJobs, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new JobID[]{jobId2}));
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jobManager);
            throw throwable;
        }
        TestingUtils.stopActor(jobManager);
    }

    public static class BlockingStatefulInvokable
    extends BlockingInvokable
    implements StatefulTask {
        private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
        private static volatile CountDownLatch completedCheckpointsLatch = new CountDownLatch(1);
        private static volatile long[] recoveredStates = new long[0];
        private int completedCheckpoints = 0;

        public void setInitialState(TaskStateSnapshot taskStateHandles) throws Exception {
            int subtaskIndex = this.getIndexInSubtaskGroup();
            if (subtaskIndex < recoveredStates.length) {
                OperatorStateHandle operatorStateHandle = BlockingStatefulInvokable.extractSingletonOperatorState(taskStateHandles);
                try (FSDataInputStream in = operatorStateHandle.openInputStream();){
                    BlockingStatefulInvokable.recoveredStates[subtaskIndex] = (Long)InstantiationUtil.deserializeObject((InputStream)in, (ClassLoader)this.getUserCodeClassLoader());
                }
            }
        }

        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
            TestByteStreamStateHandleDeepCompare byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject((Object)checkpointMetaData.getCheckpointId()));
            HashMap<String, OperatorStateHandle.StateMetaInfo> stateNameToPartitionOffsets = new HashMap<String, OperatorStateHandle.StateMetaInfo>(1);
            stateNameToPartitionOffsets.put("test-state", new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            OperatorStateHandle operatorStateHandle = new OperatorStateHandle(stateNameToPartitionOffsets, (StreamStateHandle)byteStreamStateHandle);
            TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
            checkpointStateHandles.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)this.getEnvironment().getJobVertexId()), new OperatorSubtaskState(Collections.singletonList(operatorStateHandle), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
            this.getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), new CheckpointMetrics(0L, 0L, 0L, 0L), checkpointStateHandles);
            return true;
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
            throw new UnsupportedOperationException("should not be called!");
        }

        public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
            throw new UnsupportedOperationException("should not be called!");
        }

        public void notifyCheckpointComplete(long checkpointId) {
            if (this.completedCheckpoints++ > 5) {
                completedCheckpointsLatch.countDown();
            }
        }

        static void initializeStaticHelpers(int numSubtasks) {
            completedCheckpointsLatch = new CountDownLatch(numSubtasks);
            recoveredStates = new long[numSubtasks];
        }

        static void awaitCompletedCheckpoints() throws InterruptedException {
            completedCheckpointsLatch.await();
        }

        static long[] getRecoveredStates() {
            return recoveredStates;
        }

        private static OperatorStateHandle extractSingletonOperatorState(TaskStateSnapshot taskStateHandles) {
            Set subtaskStateMappings = taskStateHandles.getSubtaskStateMappings();
            Preconditions.checkNotNull((Object)subtaskStateMappings);
            Preconditions.checkState((subtaskStateMappings.size() == 1 ? 1 : 0) != 0);
            OperatorSubtaskState subtaskState = (OperatorSubtaskState)((Map.Entry)subtaskStateMappings.iterator().next()).getValue();
            Collection managedOperatorState = ((OperatorSubtaskState)Preconditions.checkNotNull((Object)subtaskState)).getManagedOperatorState();
            Preconditions.checkNotNull((Object)managedOperatorState);
            Preconditions.checkState((managedOperatorState.size() == 1 ? 1 : 0) != 0);
            return (OperatorStateHandle)managedOperatorState.iterator().next();
        }
    }

    public static class BlockingInvokable
    extends AbstractInvokable {
        private static boolean blocking = true;
        private static Object lock = new Object();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            while (blocking) {
                Object object = lock;
                synchronized (object) {
                    lock.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void unblock() {
            blocking = false;
            Object object = lock;
            synchronized (object) {
                lock.notifyAll();
            }
        }
    }

    static class MySubmittedJobGraphStore
    implements SubmittedJobGraphStore {
        Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<JobID, SubmittedJobGraph>();

        MySubmittedJobGraphStore() {
        }

        public void start(SubmittedJobGraphStore.SubmittedJobGraphListener jobGraphListener) throws Exception {
        }

        public void stop() throws Exception {
        }

        public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
            if (this.storedJobs.containsKey(jobId)) {
                return this.storedJobs.get(jobId);
            }
            return null;
        }

        public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
            this.storedJobs.put(jobGraph.getJobId(), jobGraph);
        }

        public void removeJobGraph(JobID jobId) throws Exception {
            this.storedJobs.remove(jobId);
        }

        public Collection<JobID> getJobIds() throws Exception {
            return this.storedJobs.keySet();
        }

        boolean contains(JobID jobId) {
            return this.storedJobs.containsKey(jobId);
        }
    }

    static class MyCheckpointRecoveryFactory
    implements CheckpointRecoveryFactory {
        private final CompletedCheckpointStore store;
        private final CheckpointIDCounter counter;

        public MyCheckpointRecoveryFactory(CompletedCheckpointStore store, CheckpointIDCounter counter) {
            this.store = store;
            this.counter = counter;
        }

        public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
            return this.store;
        }

        public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
            return this.counter;
        }
    }

    static class TestingFailingHAJobManager
    extends JobManager {
        private final Collection<JobID> recoveredJobs;

        public TestingFailingHAJobManager(Configuration flinkConfiguration, ScheduledExecutorService futureExecutor, Executor ioExecutor, InstanceManager instanceManager, Scheduler scheduler, BlobServer blobServer, BlobLibraryCacheManager libraryCacheManager, ActorRef archive, RestartStrategyFactory restartStrategyFactory, FiniteDuration timeout, LeaderElectionService leaderElectionService, SubmittedJobGraphStore submittedJobGraphs, CheckpointRecoveryFactory checkpointRecoveryFactory, FiniteDuration jobRecoveryTimeout, JobManagerMetricGroup jobManagerMetricGroup, Collection<JobID> recoveredJobs) {
            super(flinkConfiguration, futureExecutor, ioExecutor, instanceManager, scheduler, blobServer, libraryCacheManager, archive, restartStrategyFactory, timeout, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, jobRecoveryTimeout, jobManagerMetricGroup, Option.empty());
            this.recoveredJobs = recoveredJobs;
        }

        public PartialFunction<Object, BoxedUnit> handleMessage() {
            return ReceiveBuilder.match(JobManagerMessages.RecoverSubmittedJob.class, (FI.UnitApply)new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>(){

                public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception {
                    recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
                }
            }).matchAny((FI.UnitApply)new FI.UnitApply<Object>(){

                public void apply(Object o) throws Exception {
                    TestingFailingHAJobManager.super.handleMessage().apply(o);
                }
            }).build();
        }
    }
}

