/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.ttl;

import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.runtime.state.ttl.MockTtlStateTest;
import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider;
import org.apache.flink.runtime.state.ttl.StateBackendTestContext;
import org.apache.flink.runtime.state.ttl.TtlAggregatingStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlFoldingStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlListStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlMapStateAllEntriesTestContext;
import org.apache.flink.runtime.state.ttl.TtlMapStatePerElementTestContext;
import org.apache.flink.runtime.state.ttl.TtlMapStatePerNullElementTestContext;
import org.apache.flink.runtime.state.ttl.TtlMergingStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlReducingStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlStateTestContextBase;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlValueStateTestContext;
import org.apache.flink.util.StateMigrationException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public abstract class TtlStateTestBase {
    private static final long TTL = 100L;
    private MockTtlTimeProvider timeProvider;
    private StateBackendTestContext sbetc;
    private StateTtlConfig ttlConfig;
    @Parameterized.Parameter
    public TtlStateTestContextBase<?, ?, ?> ctx;

    @Before
    public void setup() {
        this.timeProvider = new MockTtlTimeProvider();
        this.sbetc = this.createStateBackendTestContext(this.timeProvider);
    }

    protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider var1);

    @Parameterized.Parameters(name="{0}")
    public static List<TtlStateTestContextBase<?, ?, ?>> testContexts() {
        return Arrays.asList(new TtlValueStateTestContext(), new TtlListStateTestContext(), new TtlMapStateAllEntriesTestContext(), new TtlMapStatePerElementTestContext(), new TtlMapStatePerNullElementTestContext(), new TtlAggregatingStateTestContext(), new TtlReducingStateTestContext(), new TtlFoldingStateTestContext());
    }

    private <S extends InternalKvState<?, String, ?>, UV> TtlStateTestContextBase<S, UV, ?> ctx() {
        return this.ctx;
    }

    private <UV> TtlMergingStateTestContext<?, UV, ?> mctx() {
        return (TtlMergingStateTestContext)this.ctx;
    }

    private void initTest() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
    }

    private void initTest(StateTtlConfig.UpdateType updateType, StateTtlConfig.StateVisibility visibility) throws Exception {
        this.initTest(updateType, visibility, 100L);
    }

    private void initTest(StateTtlConfig.UpdateType updateType, StateTtlConfig.StateVisibility visibility, long ttl) throws Exception {
        this.initTest(TtlStateTestBase.getConfBuilder(ttl).setUpdateType(updateType).setStateVisibility(visibility).build());
    }

    private static StateTtlConfig.Builder getConfBuilder(long ttl) {
        return StateTtlConfig.newBuilder((Time)Time.milliseconds((long)ttl));
    }

    private void initTest(StateTtlConfig ttlConfig) throws Exception {
        this.ttlConfig = ttlConfig;
        this.sbetc.createAndRestoreKeyedStateBackend();
        this.sbetc.restoreSnapshot(null);
        this.createState();
        this.ctx().initTestValues();
    }

    private <S extends State> void createState() throws Exception {
        StateDescriptor stateDescriptor = this.ctx().createStateDescriptor();
        stateDescriptor.enableTimeToLive(this.ttlConfig);
        this.ctx().ttlState = (InternalKvState)this.sbetc.createState(stateDescriptor, "defaultNamespace");
    }

    private void takeAndRestoreSnapshot() throws Exception {
        KeyedStateHandle snapshot = this.sbetc.takeSnapshot();
        this.sbetc.createAndRestoreKeyedStateBackend();
        this.sbetc.restoreSnapshot(snapshot);
        this.createState();
    }

    @Test
    public void testNonExistentValue() throws Exception {
        this.initTest();
        Assert.assertEquals((String)"Non-existing state should be empty", this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testExactExpirationOnWrite() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 20L;
        Assert.assertEquals((String)"Unexpired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        this.ctx().update(this.ctx().updateUnexpired);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals((String)"Unexpired state should be available after update", this.ctx().getUnexpired, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        this.ctx().update(this.ctx().updateExpired);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 220L;
        Assert.assertEquals((String)"Unexpired state should be available after update", this.ctx().getUpdateExpired, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 300L;
        Assert.assertEquals((String)"Expired state should be unavailable", this.ctx().emptyValue, this.ctx().get());
        Assert.assertEquals((String)"Original state should be cleared on access", this.ctx().emptyValue, (Object)this.ctx().getOriginal());
    }

    @Test
    public void testRelaxedExpirationOnWrite() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp);
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals((String)"Expired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
        Assert.assertEquals((String)"Original state should be cleared on access", this.ctx().emptyValue, (Object)this.ctx().getOriginal());
        Assert.assertEquals((String)"Expired state should be cleared on access", this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testExactExpirationOnRead() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnReadAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals((String)"Unexpired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals((String)"Unexpired state should be available after read", this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 250L;
        Assert.assertEquals((String)"Expired state should be unavailable", this.ctx().emptyValue, this.ctx().get());
        Assert.assertEquals((String)"Original state should be cleared on access", this.ctx().emptyValue, (Object)this.ctx().getOriginal());
    }

    @Test
    public void testRelaxedExpirationOnRead() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnReadAndWrite, StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp);
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals((String)"Unexpired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        Assert.assertEquals((String)"Expired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
        Assert.assertEquals((String)"Expired state should be cleared on access", this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testExpirationTimestampOverflow() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired, Long.MAX_VALUE);
        this.timeProvider.time = 10L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals((String)"Unexpired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
    }

    @Test
    public void testMergeNamespaces() throws Exception {
        Assume.assumeThat(this.ctx, (Matcher)CoreMatchers.instanceOf(TtlMergingStateTestContext.class));
        this.initTest();
        this.timeProvider.time = 0L;
        List expiredUpdatesToMerge = this.mctx().generateExpiredUpdatesToMerge();
        this.mctx().applyStateUpdates(expiredUpdatesToMerge);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        List unexpiredUpdatesToMerge = this.mctx().generateUnexpiredUpdatesToMerge();
        this.mctx().applyStateUpdates(unexpiredUpdatesToMerge);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 150L;
        List finalUpdatesToMerge = this.mctx().generateFinalUpdatesToMerge();
        this.mctx().applyStateUpdates(finalUpdatesToMerge);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 230L;
        ((InternalMergingState)this.mctx().ttlState).mergeNamespaces((Object)"targetNamespace", TtlMergingStateTestContext.NAMESPACES);
        ((InternalMergingState)this.mctx().ttlState).setCurrentNamespace((Object)"targetNamespace");
        Assert.assertEquals((String)"Unexpected result of merge operation", this.mctx().getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), this.mctx().get());
    }

    @Test
    public void testMultipleKeys() throws Exception {
        this.testMultipleStateIdsWithSnapshotCleanup(id -> this.sbetc.setCurrentKey((String)id));
    }

    @Test
    public void testMultipleNamespaces() throws Exception {
        this.testMultipleStateIdsWithSnapshotCleanup(id -> this.ctx().ttlState.setCurrentNamespace(id));
    }

    private void testMultipleStateIdsWithSnapshotCleanup(Consumer<String> idChanger) throws Exception {
        this.initTest();
        this.testMultipleStateIds(idChanger, false);
        this.initTest(TtlStateTestBase.getConfBuilder(100L).cleanupFullSnapshot().build());
        this.testMultipleStateIds(idChanger, true);
    }

    private void testMultipleStateIds(Consumer<String> idChanger, boolean timeBackAfterRestore) throws Exception {
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 0L;
        idChanger.accept("id2");
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        idChanger.accept("id1");
        this.ctx().update(this.ctx().updateEmpty);
        idChanger.accept("id2");
        this.ctx().update(this.ctx().updateUnexpired);
        this.timeProvider.time = 120L;
        this.takeAndRestoreSnapshot();
        idChanger.accept("id1");
        Assert.assertEquals((String)"Unexpired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
        idChanger.accept("id2");
        Assert.assertEquals((String)"Unexpired state should be available after update", this.ctx().getUnexpired, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        idChanger.accept("id2");
        this.ctx().update(this.ctx().updateExpired);
        this.timeProvider.time = 230L;
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = timeBackAfterRestore ? 170L : this.timeProvider.time;
        idChanger.accept("id1");
        Assert.assertEquals((String)"Expired state should be unavailable", this.ctx().emptyValue, this.ctx().get());
        idChanger.accept("id2");
        Assert.assertEquals((String)"Unexpired state should be available after update", this.ctx().getUpdateExpired, this.ctx().get());
        this.timeProvider.time = 300L;
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = timeBackAfterRestore ? 230L : this.timeProvider.time;
        idChanger.accept("id1");
        Assert.assertEquals((String)"Expired state should be unavailable", this.ctx().emptyValue, this.ctx().get());
        idChanger.accept("id2");
        Assert.assertEquals((String)"Expired state should be unavailable", this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testSnapshotChangeRestore() throws Exception {
        this.initTest();
        this.timeProvider.time = 0L;
        this.sbetc.setCurrentKey("k1");
        this.ctx().update(this.ctx().updateEmpty);
        this.timeProvider.time = 50L;
        this.sbetc.setCurrentKey("k1");
        this.ctx().update(this.ctx().updateUnexpired);
        this.timeProvider.time = 100L;
        this.sbetc.setCurrentKey("k2");
        this.ctx().update(this.ctx().updateEmpty);
        KeyedStateHandle snapshot = this.sbetc.takeSnapshot();
        this.timeProvider.time = 170L;
        this.sbetc.setCurrentKey("k1");
        this.ctx().update(this.ctx().updateExpired);
        this.sbetc.setCurrentKey("k2");
        this.ctx().update(this.ctx().updateUnexpired);
        this.sbetc.createAndRestoreKeyedStateBackend();
        this.sbetc.restoreSnapshot(snapshot);
        this.createState();
        this.timeProvider.time = 180L;
        this.sbetc.setCurrentKey("k1");
        Assert.assertEquals((String)"Expired state should be unavailable", this.ctx().emptyValue, this.ctx().get());
        this.sbetc.setCurrentKey("k2");
        Assert.assertEquals((String)"Unexpired state should be available", this.ctx().getUpdateEmpty, this.ctx().get());
    }

    @Test(expected=StateMigrationException.class)
    public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
        Assume.assumeThat((Object)this, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.instanceOf(MockTtlStateTest.class)));
        this.initTest();
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        KeyedStateHandle snapshot = this.sbetc.takeSnapshot();
        this.sbetc.createAndRestoreKeyedStateBackend();
        this.sbetc.restoreSnapshot(snapshot);
        this.sbetc.createState(this.ctx().createStateDescriptor(), "");
    }

    @After
    public void tearDown() {
        this.sbetc.disposeKeyedStateBackend();
    }
}

