/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>>
extends TestLogger {
    protected abstract S createSink() throws Exception;

    protected abstract TypeInformation<IN> createTypeInfo();

    protected abstract IN generateValue(int var1, int var2);

    protected abstract void verifyResultsIdealCircumstances(S var1) throws Exception;

    protected abstract void verifyResultsDataPersistenceUponMissedNotify(S var1) throws Exception;

    protected abstract void verifyResultsDataDiscardingUponRestore(S var1) throws Exception;

    protected abstract void verifyResultsWhenReScaling(S var1, int var2, int var3) throws Exception;

    @Test
    public void testIdealCircumstances() throws Exception {
        int x;
        S sink = this.createSink();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(sink);
        testHarness.open();
        int elementCounter = 1;
        int snapshotCount = 0;
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness.snapshot(snapshotCount++, 0L);
        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.snapshot(snapshotCount++, 0L);
        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.snapshot(snapshotCount++, 0L);
        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
        this.verifyResultsIdealCircumstances(sink);
    }

    @Test
    public void testDataPersistenceUponMissedNotify() throws Exception {
        int x;
        S sink = this.createSink();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(sink);
        testHarness.open();
        int elementCounter = 1;
        int snapshotCount = 0;
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness.snapshot(snapshotCount++, 0L);
        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.snapshot(snapshotCount++, 0L);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.snapshot(snapshotCount++, 0L);
        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
        this.verifyResultsDataPersistenceUponMissedNotify(sink);
    }

    @Test
    public void testDataDiscardingUponRestore() throws Exception {
        int x;
        S sink = this.createSink();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(sink);
        testHarness.open();
        int elementCounter = 1;
        int snapshotCount = 0;
        for (int x2 = 0; x2 < 20; ++x2) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        OperatorStateHandles latestSnapshot = testHarness.snapshot(snapshotCount++, 0L);
        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.close();
        sink = this.createSink();
        testHarness = new OneInputStreamOperatorTestHarness(sink);
        testHarness.setup();
        testHarness.initializeState(latestSnapshot);
        testHarness.open();
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.snapshot(snapshotCount++, 0L);
        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
        this.verifyResultsDataDiscardingUponRestore(sink);
    }

    @Test
    public void testScalingDown() throws Exception {
        int x;
        S sink1 = this.createSink();
        OneInputStreamOperatorTestHarness testHarness1 = new OneInputStreamOperatorTestHarness(sink1, 10, 2, 0);
        testHarness1.open();
        S sink2 = this.createSink();
        OneInputStreamOperatorTestHarness testHarness2 = new OneInputStreamOperatorTestHarness(sink2, 10, 2, 1);
        testHarness2.open();
        int elementCounter = 1;
        int snapshotCount = 0;
        for (x = 0; x < 10; ++x) {
            testHarness1.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        for (x = 0; x < 11; ++x) {
            testHarness2.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        OperatorStateHandles snapshot1 = testHarness1.snapshot(snapshotCount, 0L);
        OperatorStateHandles snapshot2 = testHarness2.snapshot(snapshotCount, 0L);
        OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(snapshot1, snapshot2);
        testHarness1.close();
        testHarness2.close();
        S sink3 = this.createSink();
        OneInputStreamOperatorTestHarness mergedTestHarness = new OneInputStreamOperatorTestHarness(sink3, 10, 1, 0);
        mergedTestHarness.setup();
        mergedTestHarness.initializeState(mergedSnapshot);
        mergedTestHarness.open();
        for (int x2 = 0; x2 < 12; ++x2) {
            mergedTestHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        mergedTestHarness.snapshot(++snapshotCount, 1L);
        mergedTestHarness.notifyOfCompletedCheckpoint(snapshotCount);
        this.verifyResultsWhenReScaling(sink3, 1, 33);
        mergedTestHarness.close();
    }

    @Test
    public void testScalingUp() throws Exception {
        int x;
        S sink1 = this.createSink();
        OneInputStreamOperatorTestHarness testHarness1 = new OneInputStreamOperatorTestHarness(sink1, 10, 1, 0);
        int elementCounter = 1;
        int snapshotCount = 0;
        testHarness1.open();
        for (x = 0; x < 10; ++x) {
            testHarness1.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness1.snapshot(++snapshotCount, 0L);
        for (x = 0; x < 11; ++x) {
            testHarness1.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        OperatorStateHandles snapshot = testHarness1.snapshot(++snapshotCount, 0L);
        testHarness1.close();
        this.verifyResultsWhenReScaling(sink1, 0, -1);
        ++snapshotCount;
        S sink2 = this.createSink();
        OneInputStreamOperatorTestHarness testHarness2 = new OneInputStreamOperatorTestHarness(sink2, 10, 2, 0);
        testHarness2.setup();
        testHarness2.initializeState(snapshot);
        testHarness2.open();
        testHarness2.notifyOfCompletedCheckpoint(snapshotCount);
        this.verifyResultsWhenReScaling(sink2, 1, 10);
        S sink3 = this.createSink();
        OneInputStreamOperatorTestHarness testHarness3 = new OneInputStreamOperatorTestHarness(sink3, 10, 2, 1);
        testHarness3.setup();
        testHarness3.initializeState(snapshot);
        testHarness3.open();
        for (int x2 = 0; x2 < 10; ++x2) {
            testHarness3.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness3.snapshot(snapshotCount, 1L);
        testHarness3.notifyOfCompletedCheckpoint(snapshotCount);
        this.verifyResultsWhenReScaling(sink3, 11, 31);
        testHarness2.close();
        testHarness3.close();
    }
}

