/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestMemoryCheckpointOutputStream;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class DuplicatingCheckpointOutputStreamTest
extends TestLogger {
    @Test
    public void testDuplicatedWrite() throws Exception {
        int streamCapacity = 0x100000;
        TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        TestMemoryCheckpointOutputStream secondaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        TestMemoryCheckpointOutputStream referenceStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        DuplicatingCheckpointOutputStream duplicatingStream = new DuplicatingCheckpointOutputStream((CheckpointStreamFactory.CheckpointStateOutputStream)primaryStream, (CheckpointStreamFactory.CheckpointStateOutputStream)secondaryStream, 64);
        Random random = new Random(42L);
        for (int i = 0; i < 500; ++i) {
            int choice = random.nextInt(3);
            if (choice == 0) {
                int val = random.nextInt();
                referenceStream.write(val);
                duplicatingStream.write(val);
            } else {
                byte[] bytes = new byte[random.nextInt(128)];
                random.nextBytes(bytes);
                if (choice == 1) {
                    referenceStream.write(bytes);
                    duplicatingStream.write(bytes);
                } else {
                    int off = bytes.length > 0 ? random.nextInt(bytes.length) : 0;
                    int len = bytes.length > 0 ? random.nextInt(bytes.length - off) : 0;
                    referenceStream.write(bytes, off, len);
                    duplicatingStream.write(bytes, off, len);
                }
            }
            Assert.assertEquals((long)referenceStream.getPos(), (long)duplicatingStream.getPos());
        }
        StreamStateHandle refStateHandle = referenceStream.closeAndGetHandle();
        StreamStateHandle primaryStateHandle = duplicatingStream.closeAndGetPrimaryHandle();
        StreamStateHandle secondaryStateHandle = duplicatingStream.closeAndGetSecondaryHandle();
        Assert.assertTrue((boolean)CommonTestUtils.isSteamContentEqual((InputStream)refStateHandle.openInputStream(), (InputStream)primaryStateHandle.openInputStream()));
        Assert.assertTrue((boolean)CommonTestUtils.isSteamContentEqual((InputStream)refStateHandle.openInputStream(), (InputStream)secondaryStateHandle.openInputStream()));
        refStateHandle.discardState();
        primaryStateHandle.discardState();
        secondaryStateHandle.discardState();
    }

    @Test
    public void testSecondaryWriteFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> {
            for (int i = 0; i < 128; ++i) {
                duplicatingStream.write(42);
            }
        });
    }

    @Test
    public void testFailingSecondaryWriteArrayFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512]));
    }

    @Test
    public void testFailingSecondaryWriteArrayOffsFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512], 20, 130));
    }

    @Test
    public void testFailingSecondaryFlush() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).flush());
    }

    @Test
    public void testFailingSecondarySync() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).sync());
    }

    @Test
    public void testPrimaryWriteFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> {
            for (int i = 0; i < 128; ++i) {
                duplicatingStream.write(42);
            }
        });
    }

    @Test
    public void testFailingPrimaryWriteArrayFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512]));
    }

    @Test
    public void testFailingPrimaryWriteArrayOffsFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512], 20, 130));
    }

    @Test
    public void testFailingPrimaryFlush() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).flush());
    }

    @Test
    public void testFailingPrimarySync() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).sync());
    }

    private void testFailingSecondaryStream(DuplicatingCheckpointOutputStream duplicatingStream, StreamTestMethod testMethod) throws Exception {
        testMethod.call();
        duplicatingStream.write(42);
        FailingCheckpointOutStream secondary = (FailingCheckpointOutStream)duplicatingStream.getSecondaryOutputStream();
        Assert.assertTrue((boolean)secondary.isClosed());
        long pos = duplicatingStream.getPos();
        StreamStateHandle primaryHandle = duplicatingStream.closeAndGetPrimaryHandle();
        if (primaryHandle != null) {
            Assert.assertEquals((long)pos, (long)primaryHandle.getStateSize());
        }
        try {
            duplicatingStream.closeAndGetSecondaryHandle();
            Assert.fail();
        }
        catch (IOException ioEx) {
            Assert.assertEquals((Object)ioEx.getCause(), (Object)duplicatingStream.getSecondaryStreamException());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testFailingPrimaryStream(DuplicatingCheckpointOutputStream duplicatingStream, StreamTestMethod testMethod) throws Exception {
        try {
            testMethod.call();
            Assert.fail();
        }
        catch (IOException iOException) {
        }
        finally {
            IOUtils.closeQuietly((OutputStream)duplicatingStream);
        }
    }

    @Test
    public void testUnalignedStreamsException() throws IOException {
        int streamCapacity = 0x100000;
        TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        TestMemoryCheckpointOutputStream secondaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        primaryStream.write(42);
        DuplicatingCheckpointOutputStream stream = new DuplicatingCheckpointOutputStream((CheckpointStreamFactory.CheckpointStateOutputStream)primaryStream, (CheckpointStreamFactory.CheckpointStateOutputStream)secondaryStream);
        Assert.assertNotNull((Object)stream.getSecondaryStreamException());
        Assert.assertTrue((boolean)secondaryStream.isClosed());
        stream.write(23);
        try {
            stream.closeAndGetSecondaryHandle();
            Assert.fail();
        }
        catch (IOException ignore) {
            Assert.assertEquals((Object)ignore.getCause(), (Object)stream.getSecondaryStreamException());
        }
        StreamStateHandle primaryHandle = stream.closeAndGetPrimaryHandle();
        try (FSDataInputStream inputStream = primaryHandle.openInputStream();){
            Assert.assertEquals((long)42L, (long)inputStream.read());
            Assert.assertEquals((long)23L, (long)inputStream.read());
            Assert.assertEquals((long)-1L, (long)inputStream.read());
        }
    }

    private DuplicatingCheckpointOutputStream createDuplicatingStreamWithFailingSecondary() throws IOException {
        int streamCapacity = 0x100000;
        TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        FailingCheckpointOutStream failSecondaryStream = new FailingCheckpointOutStream();
        return new DuplicatingCheckpointOutputStream((CheckpointStreamFactory.CheckpointStateOutputStream)primaryStream, (CheckpointStreamFactory.CheckpointStateOutputStream)failSecondaryStream, 64);
    }

    private DuplicatingCheckpointOutputStream createDuplicatingStreamWithFailingPrimary() throws IOException {
        int streamCapacity = 0x100000;
        FailingCheckpointOutStream failPrimaryStream = new FailingCheckpointOutStream();
        TestMemoryCheckpointOutputStream secondary = new TestMemoryCheckpointOutputStream(streamCapacity);
        return new DuplicatingCheckpointOutputStream((CheckpointStreamFactory.CheckpointStateOutputStream)failPrimaryStream, (CheckpointStreamFactory.CheckpointStateOutputStream)secondary, 64);
    }

    @FunctionalInterface
    private static interface StreamTestMethod {
        public void call() throws IOException;
    }

    private static class FailingCheckpointOutStream
    extends CheckpointStreamFactory.CheckpointStateOutputStream {
        private boolean closed = false;

        private FailingCheckpointOutStream() {
        }

        @Nullable
        public StreamStateHandle closeAndGetHandle() throws IOException {
            throw new IOException();
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public void write(int b) throws IOException {
            throw new IOException();
        }

        public void flush() throws IOException {
            throw new IOException();
        }

        public void sync() throws IOException {
            throw new IOException();
        }

        public void close() throws IOException {
            this.closed = true;
        }

        public boolean isClosed() {
            return this.closed;
        }
    }
}

