/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.heartbeat;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatManagerTest
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);

    @Test
    public void testRegularHeartbeat() {
        long heartbeatTimeout = 1000L;
        ResourceID ownResourceID = new ResourceID("foobar");
        ResourceID targetResourceID = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Object expectedObject = new Object();
        Mockito.when((Object)heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(heartbeatTimeout, ownResourceID, heartbeatListener, (Executor)new DirectExecutorService(), scheduledExecutor, LOG);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget)Mockito.mock(HeartbeatTarget.class);
        heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
        heartbeatManager.requestHeartbeat(targetResourceID, expectedObject);
        ((HeartbeatListener)Mockito.verify((Object)heartbeatListener, (VerificationMode)Mockito.times((int)1))).reportPayload(targetResourceID, expectedObject);
        ((HeartbeatListener)Mockito.verify((Object)heartbeatListener, (VerificationMode)Mockito.times((int)1))).retrievePayload();
        ((HeartbeatTarget)Mockito.verify((Object)heartbeatTarget, (VerificationMode)Mockito.times((int)1))).receiveHeartbeat(ownResourceID, expectedObject);
        heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
        ((HeartbeatListener)Mockito.verify((Object)heartbeatListener, (VerificationMode)Mockito.times((int)2))).reportPayload(targetResourceID, expectedObject);
    }

    @Test
    public void testHeartbeatMonitorUpdate() {
        long heartbeatTimeout = 1000L;
        ResourceID ownResourceID = new ResourceID("foobar");
        ResourceID targetResourceID = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        ScheduledFuture scheduledFuture = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ((ScheduledExecutor)Mockito.doReturn((Object)scheduledFuture).when((Object)scheduledExecutor)).schedule((Runnable)Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        Object expectedObject = new Object();
        Mockito.when((Object)heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(heartbeatTimeout, ownResourceID, heartbeatListener, (Executor)new DirectExecutorService(), scheduledExecutor, LOG);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget)Mockito.mock(HeartbeatTarget.class);
        heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
        heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
        ((ScheduledFuture)Mockito.verify((Object)scheduledFuture, (VerificationMode)Mockito.times((int)1))).cancel(true);
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor, (VerificationMode)Mockito.times((int)2))).schedule((Runnable)Matchers.any(Runnable.class), Matchers.eq((long)heartbeatTimeout), (TimeUnit)((Object)Matchers.eq((Object)((Object)TimeUnit.MILLISECONDS))));
    }

    @Test
    public void testHeartbeatTimeout() throws Exception {
        long heartbeatTimeout = 100L;
        int numHeartbeats = 10;
        long heartbeatInterval = 20L;
        Object payload = new Object();
        ResourceID ownResourceID = new ResourceID("foobar");
        ResourceID targetResourceID = new ResourceID("barfoo");
        TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(payload);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class);
        ScheduledFuture scheduledFuture = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ((ScheduledExecutorService)Mockito.doReturn((Object)scheduledFuture).when((Object)scheduledExecutorService)).schedule((Runnable)Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        Object expectedObject = new Object();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(heartbeatTimeout, ownResourceID, (HeartbeatListener)heartbeatListener, (Executor)new DirectExecutorService(), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)new ScheduledThreadPoolExecutor(1)), LOG);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget)Mockito.mock(HeartbeatTarget.class);
        CompletableFuture<ResourceID> timeoutFuture = heartbeatListener.getTimeoutFuture();
        heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
        for (int i = 0; i < numHeartbeats; ++i) {
            heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
            Thread.sleep(heartbeatInterval);
        }
        Assert.assertFalse((boolean)timeoutFuture.isDone());
        ResourceID timeoutResourceID = timeoutFuture.get(2L * heartbeatTimeout, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)targetResourceID, (Object)timeoutResourceID);
    }

    @Test
    public void testHeartbeatCluster() throws Exception {
        long heartbeatTimeout = 100L;
        long heartbeatPeriod = 20L;
        Object object = new Object();
        Object object2 = new Object();
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        Mockito.when((Object)heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object));
        TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2);
        CompletableFuture<ResourceID> futureTimeout = heartbeatListener2.getTimeoutFuture();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(heartbeatTimeout, resourceID, heartbeatListener, (Executor)new DirectExecutorService(), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)new ScheduledThreadPoolExecutor(1)), LOG);
        HeartbeatManagerSenderImpl heartbeatManager2 = new HeartbeatManagerSenderImpl(heartbeatPeriod, heartbeatTimeout, resourceID2, (HeartbeatListener)heartbeatListener2, (Executor)new DirectExecutorService(), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)new ScheduledThreadPoolExecutor(1)), LOG);
        heartbeatManager.monitorTarget(resourceID2, (HeartbeatTarget)heartbeatManager2);
        heartbeatManager2.monitorTarget(resourceID, (HeartbeatTarget)heartbeatManager);
        Thread.sleep(2L * heartbeatTimeout);
        Assert.assertFalse((boolean)futureTimeout.isDone());
        heartbeatManager.stop();
        ResourceID timeoutResourceID = futureTimeout.get(2L * heartbeatTimeout, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)resourceID, (Object)timeoutResourceID);
        int numberHeartbeats = (int)(2L * heartbeatTimeout / heartbeatPeriod);
        ((HeartbeatListener)Mockito.verify((Object)heartbeatListener, (VerificationMode)Mockito.atLeast((int)(numberHeartbeats / 2)))).reportPayload(resourceID2, object2);
        Assert.assertTrue((heartbeatListener2.getNumberHeartbeatReports() >= numberHeartbeats / 2 ? 1 : 0) != 0);
    }

    @Test
    public void testTargetUnmonitoring() throws InterruptedException, ExecutionException {
        long heartbeatTimeout = 100L;
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID targetID = new ResourceID("target");
        Object object = new Object();
        TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(object);
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(heartbeatTimeout, resourceID, (HeartbeatListener)heartbeatListener, (Executor)new DirectExecutorService(), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)new ScheduledThreadPoolExecutor(1)), LOG);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget)Mockito.mock(HeartbeatTarget.class);
        heartbeatManager.monitorTarget(targetID, heartbeatTarget);
        heartbeatManager.unmonitorTarget(targetID);
        CompletableFuture<ResourceID> timeout = heartbeatListener.getTimeoutFuture();
        try {
            timeout.get(2L * heartbeatTimeout, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Timeout should time out.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLastHeartbeatFromUnregisteredTarget() {
        long heartbeatTimeout = 100L;
        ResourceID resourceId = ResourceID.generate();
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(100L, resourceId, heartbeatListener, Executors.directExecutor(), (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            Assert.assertEquals((long)-1L, (long)heartbeatManager.getLastHeartbeatFrom(ResourceID.generate()));
        }
        finally {
            heartbeatManager.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLastHeartbeatFrom() {
        long heartbeatTimeout = 100L;
        ResourceID resourceId = ResourceID.generate();
        HeartbeatListener heartbeatListener = (HeartbeatListener)Mockito.mock(HeartbeatListener.class);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget)Mockito.mock(HeartbeatTarget.class);
        ResourceID target = ResourceID.generate();
        HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl(100L, resourceId, heartbeatListener, Executors.directExecutor(), (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            heartbeatManager.monitorTarget(target, heartbeatTarget);
            Assert.assertEquals((long)0L, (long)heartbeatManager.getLastHeartbeatFrom(target));
            long currentTime = System.currentTimeMillis();
            heartbeatManager.receiveHeartbeat(target, null);
            Assert.assertTrue((heartbeatManager.getLastHeartbeatFrom(target) >= currentTime ? 1 : 0) != 0);
        }
        finally {
            heartbeatManager.stop();
        }
    }

    static class TestingHeartbeatListener
    implements HeartbeatListener<Object, Object> {
        private final CompletableFuture<ResourceID> future = new CompletableFuture();
        private final Object payload;
        private int numberHeartbeatReports;

        TestingHeartbeatListener(Object payload) {
            this.payload = payload;
        }

        CompletableFuture<ResourceID> getTimeoutFuture() {
            return this.future;
        }

        public int getNumberHeartbeatReports() {
            return this.numberHeartbeatReports;
        }

        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            this.future.complete(resourceID);
        }

        public void reportPayload(ResourceID resourceID, Object payload) {
            ++this.numberHeartbeatReports;
        }

        public CompletableFuture<Object> retrievePayload() {
            return CompletableFuture.completedFuture(this.payload);
        }
    }
}

