/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshot;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class HeapInternalTimerServiceTest {
    private final int maxParallelism;
    private final KeyGroupRange testKeyGroupRange;

    private static InternalTimer<Integer, String> anyInternalTimer() {
        return (InternalTimer)Mockito.any();
    }

    public HeapInternalTimerServiceTest(int startKeyGroup, int endKeyGroup, int maxParallelism) {
        this.testKeyGroupRange = new KeyGroupRange(startKeyGroup, endKeyGroup);
        this.maxParallelism = maxParallelism;
    }

    @Test
    public void testKeyGroupStartIndexSetting() {
        int startKeyGroupIdx = 7;
        int endKeyGroupIdx = 21;
        KeyGroupRange testKeyGroupList = new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService service = new HeapInternalTimerService(testKeyGroupList.getNumberOfKeyGroups(), (KeyGroupsList)testKeyGroupList, (KeyContext)keyContext, (ProcessingTimeService)processingTimeService);
        Assert.assertEquals((long)startKeyGroupIdx, (long)service.getLocalKeyGroupRangeStartIdx());
    }

    @Test
    public void testTimerAssignmentToKeyGroups() {
        int totalNoOfTimers = 100;
        int totalNoOfKeyGroups = 100;
        int startKeyGroupIdx = 0;
        int endKeyGroupIdx = totalNoOfKeyGroups - 1;
        HashSet[] expectedNonEmptyTimerSets = new HashSet[totalNoOfKeyGroups];
        TestKeyContext keyContext = new TestKeyContext();
        HeapInternalTimerService timerService = new HeapInternalTimerService(totalNoOfKeyGroups, (KeyGroupsList)new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx), (KeyContext)keyContext, (ProcessingTimeService)new TestProcessingTimeService());
        timerService.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)Mockito.mock(Triggerable.class));
        for (int i = 0; i < totalNoOfTimers; ++i) {
            InternalTimer timer = new InternalTimer((long)(10 + i), (Object)i, (Object)("hello_world_" + i));
            int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup((Object)timer.getKey(), (int)totalNoOfKeyGroups);
            HashSet<InternalTimer> timerSet = expectedNonEmptyTimerSets[keyGroupIdx];
            if (timerSet == null) {
                expectedNonEmptyTimerSets[keyGroupIdx] = timerSet = new HashSet<InternalTimer>();
            }
            timerSet.add(timer);
            keyContext.setCurrentKey(timer.getKey());
            timerService.registerEventTimeTimer(timer.getNamespace(), timer.getTimestamp());
            timerService.registerProcessingTimeTimer(timer.getNamespace(), timer.getTimestamp());
        }
        Set[] eventTimeTimers = timerService.getEventTimeTimersPerKeyGroup();
        Set[] processingTimeTimers = timerService.getProcessingTimeTimersPerKeyGroup();
        for (int i = 0; i < expectedNonEmptyTimerSets.length; ++i) {
            HashSet expected = expectedNonEmptyTimerSets[i];
            Set actualEvent = eventTimeTimers[i];
            Set actualProcessing = processingTimeTimers[i];
            if (expected == null) {
                Assert.assertNull((Object)actualEvent);
                Assert.assertNull((Object)actualProcessing);
                continue;
            }
            Assert.assertArrayEquals((Object[])expected.toArray(), (Object[])actualEvent.toArray());
            Assert.assertArrayEquals((Object[])expected.toArray(), (Object[])actualProcessing.toArray());
        }
    }

    @Test
    public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        keyContext.setCurrentKey(key);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"ciao", 20L);
        timerService.registerProcessingTimeTimer((Object)"ciao", 30L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 20L);
        Assert.assertEquals((long)5L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)3L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{10L}));
        processingTimeService.setCurrentTime(10L);
        Assert.assertEquals((long)3L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{20L}));
        processingTimeService.setCurrentTime(20L);
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)0L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{30L}));
        processingTimeService.setCurrentTime(30L);
        Assert.assertEquals((long)0L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)0L, (long)processingTimeService.getNumActiveTimers());
        timerService.registerProcessingTimeTimer((Object)"ciao", 40L);
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
    }

    @Test
    public void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        keyContext.setCurrentKey(key);
        timerService.registerProcessingTimeTimer((Object)"ciao", 20L);
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{20L}));
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{10L}));
    }

    @Test
    public void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        final HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        keyContext.setCurrentKey(key);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{10L}));
        ((Triggerable)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                timerService.registerProcessingTimeTimer((Object)"ciao", 20L);
                return null;
            }
        }).when((Object)mockTriggerable)).onProcessingTime(HeapInternalTimerServiceTest.anyInternalTimer());
        processingTimeService.setCurrentTime(10L);
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{20L}));
        ((Triggerable)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                timerService.registerProcessingTimeTimer((Object)"ciao", 30L);
                return null;
            }
        }).when((Object)mockTriggerable)).onProcessingTime(HeapInternalTimerServiceTest.anyInternalTimer());
        processingTimeService.setCurrentTime(20L);
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)processingTimeService.getNumActiveTimers());
        Assert.assertThat((Object)processingTimeService.getActiveTimerTimestamps(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{30L}));
    }

    @Test
    public void testCurrentProcessingTime() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        processingTimeService.setCurrentTime(17L);
        Assert.assertEquals((long)17L, (long)timerService.currentProcessingTime());
        processingTimeService.setCurrentTime(42L);
        Assert.assertEquals((long)42L, (long)timerService.currentProcessingTime());
    }

    @Test
    public void testCurrentEventTime() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        timerService.advanceWatermark(17L);
        Assert.assertEquals((long)17L, (long)timerService.currentWatermark());
        timerService.advanceWatermark(42L);
        Assert.assertEquals((long)42L, (long)timerService.currentWatermark());
    }

    @Test
    public void testSetAndFireEventTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key1 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        Assert.assertEquals((long)4L, (long)timerService.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)timerService.numEventTimeTimers((Object)"hello"));
        Assert.assertEquals((long)2L, (long)timerService.numEventTimeTimers((Object)"ciao"));
        timerService.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)4))).onEventTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"hello")));
        Assert.assertEquals((long)0L, (long)timerService.numEventTimeTimers());
    }

    @Test
    public void testSetAndFireProcessingTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key1 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assert.assertEquals((long)4L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        processingTimeService.setCurrentTime(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)4))).onProcessingTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"hello")));
        Assert.assertEquals((long)0L, (long)timerService.numProcessingTimeTimers());
    }

    @Test
    public void testDeleteEventTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key1 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        Assert.assertEquals((long)4L, (long)timerService.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)timerService.numEventTimeTimers((Object)"hello"));
        Assert.assertEquals((long)2L, (long)timerService.numEventTimeTimers((Object)"ciao"));
        keyContext.setCurrentKey(key1);
        timerService.deleteEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.deleteEventTimeTimer((Object)"ciao", 10L);
        Assert.assertEquals((long)2L, (long)timerService.numEventTimeTimers());
        Assert.assertEquals((long)1L, (long)timerService.numEventTimeTimers((Object)"hello"));
        Assert.assertEquals((long)1L, (long)timerService.numEventTimeTimers((Object)"ciao"));
        timerService.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)2))).onEventTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"hello")));
        Assert.assertEquals((long)0L, (long)timerService.numEventTimeTimers());
    }

    @Test
    public void testDeleteProcessingTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key1 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assert.assertEquals((long)4L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        keyContext.setCurrentKey(key1);
        timerService.deleteProcessingTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.deleteProcessingTimeTimer((Object)"ciao", 10L);
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        processingTimeService.setCurrentTime(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)2))).onProcessingTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"hello")));
        Assert.assertEquals((long)0L, (long)timerService.numEventTimeTimers());
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        this.testSnapshotAndRestore(1);
    }

    @Test
    public void testSnapshotAndRestorePreVersioned() throws Exception {
        this.testSnapshotAndRestore(Integer.MIN_VALUE);
    }

    @Test
    public void testSnapshotAndRebalancingRestore() throws Exception {
        this.testSnapshotAndRebalancingRestore(1);
    }

    @Test
    public void testSnapshotAndRebalancingRestorePreVersioned() throws Exception {
        this.testSnapshotAndRebalancingRestore(Integer.MIN_VALUE);
    }

    private void testSnapshotAndRestore(int snapshotVersion) throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int key1 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        Assert.assertEquals((long)2L, (long)timerService.numEventTimeTimers());
        Assert.assertEquals((long)1L, (long)timerService.numEventTimeTimers((Object)"hello"));
        Assert.assertEquals((long)1L, (long)timerService.numEventTimeTimers((Object)"ciao"));
        HashMap<Integer, byte[]> snapshot = new HashMap<Integer, byte[]>();
        for (Integer keyGroupIndex : this.testKeyGroupRange) {
            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
            Throwable throwable = null;
            try {
                InternalTimersSnapshot timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex.intValue());
                InternalTimersSnapshotReaderWriters.getWriterForVersion((int)snapshotVersion, (InternalTimersSnapshot)timersSnapshot).writeTimersSnapshot((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)outStream));
                snapshot.put(keyGroupIndex, outStream.toByteArray());
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (outStream == null) continue;
                if (throwable != null) {
                    try {
                        outStream.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                outStream.close();
            }
        }
        Triggerable mockTriggerable2 = (Triggerable)Mockito.mock(Triggerable.class);
        keyContext = new TestKeyContext();
        processingTimeService = new TestProcessingTimeService();
        timerService = HeapInternalTimerServiceTest.restoreTimerService(snapshot, snapshotVersion, (Triggerable<Integer, String>)mockTriggerable2, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        processingTimeService.setCurrentTime(10L);
        timerService.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)2))).onProcessingTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)2))).onEventTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"ciao")));
        Assert.assertEquals((long)0L, (long)timerService.numEventTimeTimers());
    }

    private void testSnapshotAndRebalancingRestore(int snapshotVersion) throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService = HeapInternalTimerServiceTest.createTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, (KeyGroupsList)this.testKeyGroupRange, this.maxParallelism);
        int midpoint = this.testKeyGroupRange.getStartKeyGroup() + (this.testKeyGroupRange.getEndKeyGroup() - this.testKeyGroupRange.getStartKeyGroup()) / 2;
        KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(this.testKeyGroupRange.getStartKeyGroup(), midpoint);
        KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(midpoint + 1, this.testKeyGroupRange.getEndKeyGroup());
        int key1 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(subKeyGroupRange1, this.maxParallelism);
        int key2 = HeapInternalTimerServiceTest.getKeyInKeyGroupRange(subKeyGroupRange2, this.maxParallelism);
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assert.assertEquals((long)2L, (long)timerService.numProcessingTimeTimers());
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"hello"));
        Assert.assertEquals((long)1L, (long)timerService.numProcessingTimeTimers((Object)"ciao"));
        Assert.assertEquals((long)2L, (long)timerService.numEventTimeTimers());
        Assert.assertEquals((long)1L, (long)timerService.numEventTimeTimers((Object)"hello"));
        Assert.assertEquals((long)1L, (long)timerService.numEventTimeTimers((Object)"ciao"));
        HashMap<Integer, byte[]> snapshot1 = new HashMap<Integer, byte[]>();
        HashMap<Integer, byte[]> snapshot2 = new HashMap<Integer, byte[]>();
        for (Integer keyGroupIndex : this.testKeyGroupRange) {
            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
            Throwable throwable = null;
            try {
                InternalTimersSnapshot timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex.intValue());
                InternalTimersSnapshotReaderWriters.getWriterForVersion((int)snapshotVersion, (InternalTimersSnapshot)timersSnapshot).writeTimersSnapshot((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)outStream));
                if (subKeyGroupRange1.contains(keyGroupIndex.intValue())) {
                    snapshot1.put(keyGroupIndex, outStream.toByteArray());
                    continue;
                }
                if (subKeyGroupRange2.contains(keyGroupIndex.intValue())) {
                    snapshot2.put(keyGroupIndex, outStream.toByteArray());
                    continue;
                }
                throw new IllegalStateException("Key-Group index doesn't belong to any sub range.");
            }
            catch (Throwable timersSnapshot) {
                throwable = timersSnapshot;
                throw timersSnapshot;
            }
            finally {
                if (outStream == null) continue;
                if (throwable != null) {
                    try {
                        outStream.close();
                    }
                    catch (Throwable timersSnapshot) {
                        throwable.addSuppressed(timersSnapshot);
                    }
                    continue;
                }
                outStream.close();
            }
        }
        Triggerable mockTriggerable1 = (Triggerable)Mockito.mock(Triggerable.class);
        Triggerable mockTriggerable2 = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext1 = new TestKeyContext();
        TestKeyContext keyContext2 = new TestKeyContext();
        TestProcessingTimeService processingTimeService1 = new TestProcessingTimeService();
        TestProcessingTimeService processingTimeService2 = new TestProcessingTimeService();
        HeapInternalTimerService<Integer, String> timerService1 = HeapInternalTimerServiceTest.restoreTimerService(snapshot1, snapshotVersion, (Triggerable<Integer, String>)mockTriggerable1, keyContext1, (ProcessingTimeService)processingTimeService1, (KeyGroupsList)subKeyGroupRange1, this.maxParallelism);
        HeapInternalTimerService<Integer, String> timerService2 = HeapInternalTimerServiceTest.restoreTimerService(snapshot2, snapshotVersion, (Triggerable<Integer, String>)mockTriggerable2, keyContext2, (ProcessingTimeService)processingTimeService2, (KeyGroupsList)subKeyGroupRange2, this.maxParallelism);
        processingTimeService1.setCurrentTime(10L);
        timerService1.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onProcessingTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.never())).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onEventTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.never())).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"ciao")));
        Assert.assertEquals((long)0L, (long)timerService1.numEventTimeTimers());
        processingTimeService2.setCurrentTime(10L);
        timerService2.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.never())).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime(HeapInternalTimerServiceTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.never())).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new InternalTimer(10L, (Object)key2, (Object)"ciao")));
        Assert.assertEquals((long)0L, (long)timerService2.numEventTimeTimers());
    }

    private static int getKeyInKeyGroup(int keyGroup, int maxParallelism) {
        Random rand = new Random(System.currentTimeMillis());
        int result = rand.nextInt();
        while (KeyGroupRangeAssignment.assignToKeyGroup((Object)result, (int)maxParallelism) != keyGroup) {
            result = rand.nextInt();
        }
        return result;
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
        Random rand = new Random(System.currentTimeMillis());
        int result = rand.nextInt();
        while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup((Object)result, (int)maxParallelism))) {
            result = rand.nextInt();
        }
        return result;
    }

    private static HeapInternalTimerService<Integer, String> createTimerService(Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupsList keyGroupList, int maxParallelism) {
        HeapInternalTimerService service = new HeapInternalTimerService(maxParallelism, keyGroupList, keyContext, processingTimeService);
        service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, triggerable);
        return service;
    }

    private static HeapInternalTimerService<Integer, String> restoreTimerService(Map<Integer, byte[]> state, int snapshotVersion, Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupsList keyGroupsList, int maxParallelism) throws Exception {
        HeapInternalTimerService service = new HeapInternalTimerService(maxParallelism, keyGroupsList, keyContext, processingTimeService);
        for (Integer keyGroupIndex : keyGroupsList) {
            if (!state.containsKey(keyGroupIndex)) continue;
            ByteArrayInputStream inputStream = new ByteArrayInputStream(state.get(keyGroupIndex));
            Throwable throwable = null;
            try {
                InternalTimersSnapshot restoredTimersSnapshot = InternalTimersSnapshotReaderWriters.getReaderForVersion((int)snapshotVersion, (ClassLoader)HeapInternalTimerServiceTest.class.getClassLoader()).readTimersSnapshot((DataInputView)new DataInputViewStreamWrapper((InputStream)inputStream));
                service.restoreTimersForKeyGroup(restoredTimersSnapshot, keyGroupIndex.intValue());
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (inputStream == null) continue;
                if (throwable != null) {
                    try {
                        inputStream.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                inputStream.close();
            }
        }
        service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, triggerable);
        return service;
    }

    @Parameterized.Parameters(name="start = {0}, end = {1}, max = {2}")
    public static Collection<Object[]> keyRanges() {
        return Arrays.asList({0, 32766, (short)Short.MAX_VALUE}, {0, 10, (short)Short.MAX_VALUE}, {0, 10, 10}, {10, 32766, (short)Short.MAX_VALUE}, {2, 5, 100}, {2, 5, 6});
    }

    private static class TestKeyContext
    implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object key) {
            this.key = key;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }
}

