/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageTestBase;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class FsCheckpointStorageTest
extends AbstractFileCheckpointStorageTestBase {
    private static final int FILE_SIZE_THRESHOLD = 1024;

    @Override
    protected CheckpointStorage createCheckpointStorage(Path checkpointDir) throws Exception {
        return new FsCheckpointStorage(checkpointDir, null, new JobID(), 1024);
    }

    @Override
    protected CheckpointStorage createCheckpointStorageWithSavepointDir(Path checkpointDir, Path savepointDir) throws Exception {
        return new FsCheckpointStorage(checkpointDir, savepointDir, new JobID(), 1024);
    }

    @Test
    public void testSavepointsInOneDirectoryDefaultLocation() throws Exception {
        Path defaultSavepointDir = Path.fromLocalFile((File)this.tmp.newFolder());
        FsCheckpointStorage storage = new FsCheckpointStorage(Path.fromLocalFile((File)this.tmp.newFolder()), defaultSavepointDir, new JobID(), 1024);
        FsCheckpointStorageLocation savepointLocation = (FsCheckpointStorageLocation)storage.initializeLocationForSavepoint(52452L, null);
        this.assertParent(defaultSavepointDir, savepointLocation.getCheckpointDirectory());
        this.assertParent(defaultSavepointDir, savepointLocation.getSharedStateDirectory());
        this.assertParent(defaultSavepointDir, savepointLocation.getTaskOwnedStateDirectory());
        savepointLocation.disposeOnFailure();
    }

    @Test
    public void testSavepointsInOneDirectoryCustomLocation() throws Exception {
        Path savepointDir = Path.fromLocalFile((File)this.tmp.newFolder());
        FsCheckpointStorage storage = new FsCheckpointStorage(Path.fromLocalFile((File)this.tmp.newFolder()), null, new JobID(), 1024);
        FsCheckpointStorageLocation savepointLocation = (FsCheckpointStorageLocation)storage.initializeLocationForSavepoint(52452L, savepointDir.toString());
        this.assertParent(savepointDir, savepointLocation.getCheckpointDirectory());
        this.assertParent(savepointDir, savepointLocation.getSharedStateDirectory());
        this.assertParent(savepointDir, savepointLocation.getTaskOwnedStateDirectory());
        savepointLocation.disposeOnFailure();
    }

    @Test
    public void testTaskOwnedStateStream() throws Exception {
        StreamStateHandle stateHandle;
        List<String> state = Arrays.asList("Flopsy", "Mopsy", "Cotton Tail", "Peter");
        FsCheckpointStorage storage = new FsCheckpointStorage(Path.fromLocalFile((File)this.tmp.newFolder()), null, new JobID(), 10);
        try (CheckpointStreamFactory.CheckpointStateOutputStream stream = storage.createTaskOwnedStateStream();){
            Assert.assertTrue((boolean)(stream instanceof FsCheckpointStreamFactory.FsCheckpointStateOutputStream));
            new ObjectOutputStream((OutputStream)stream).writeObject(state);
            stateHandle = stream.closeAndGetHandle();
        }
        FileStateHandle fileStateHandle = (FileStateHandle)stateHandle;
        String parentDirName = fileStateHandle.getFilePath().getParent().getName();
        Assert.assertEquals((Object)"taskowned", (Object)parentDirName);
        try (ObjectInputStream in = new ObjectInputStream((InputStream)stateHandle.openInputStream());){
            Assert.assertEquals(state, (Object)in.readObject());
        }
    }

    @Test
    public void testDirectoriesForExclusiveAndSharedState() throws Exception {
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        Path checkpointDir = this.randomTempPath();
        Path sharedStateDir = this.randomTempPath();
        FsCheckpointStorageLocation storageLocation = new FsCheckpointStorageLocation((FileSystem)fs, checkpointDir, sharedStateDir, this.randomTempPath(), CheckpointStorageLocationReference.getDefault(), 1024);
        Assert.assertNotEquals((Object)storageLocation.getCheckpointDirectory(), (Object)storageLocation.getSharedStateDirectory());
        Assert.assertEquals((long)0L, (long)fs.listStatus(storageLocation.getCheckpointDirectory()).length);
        Assert.assertEquals((long)0L, (long)fs.listStatus(storageLocation.getSharedStateDirectory()).length);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream exclusiveStream = storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        exclusiveStream.write(42);
        exclusiveStream.flush();
        StreamStateHandle exclusiveHandle = exclusiveStream.closeAndGetHandle();
        Assert.assertEquals((long)1L, (long)fs.listStatus(storageLocation.getCheckpointDirectory()).length);
        Assert.assertEquals((long)0L, (long)fs.listStatus(storageLocation.getSharedStateDirectory()).length);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream sharedStream = storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        sharedStream.write(42);
        sharedStream.flush();
        StreamStateHandle sharedHandle = sharedStream.closeAndGetHandle();
        Assert.assertEquals((long)1L, (long)fs.listStatus(storageLocation.getCheckpointDirectory()).length);
        Assert.assertEquals((long)1L, (long)fs.listStatus(storageLocation.getSharedStateDirectory()).length);
        exclusiveHandle.discardState();
        sharedHandle.discardState();
    }

    @Test
    public void testStorageLocationDoesNotMkdirs() throws Exception {
        FsCheckpointStorage storage = new FsCheckpointStorage(this.randomTempPath(), null, new JobID(), 1024);
        File baseDir = new File(storage.getCheckpointsDirectory().getPath());
        Assert.assertTrue((boolean)baseDir.exists());
        FsCheckpointStorageLocation location = (FsCheckpointStorageLocation)storage.resolveCheckpointStorageLocation(177L, CheckpointStorageLocationReference.getDefault());
        Path checkpointPath = location.getCheckpointDirectory();
        File checkpointDir = new File(checkpointPath.getPath());
        Assert.assertFalse((boolean)checkpointDir.exists());
    }

    @Test
    public void testResolveCheckpointStorageLocation() throws Exception {
        FileSystem checkpointFileSystem = (FileSystem)Mockito.mock(FileSystem.class);
        FsCheckpointStorage storage = new FsCheckpointStorage((Path)new TestingPath("hdfs:///checkpoint/", checkpointFileSystem), null, new JobID(), 1024);
        FsCheckpointStorageLocation checkpointStreamFactory = (FsCheckpointStorageLocation)storage.resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
        Assert.assertEquals((Object)checkpointFileSystem, (Object)checkpointStreamFactory.getFileSystem());
        CheckpointStorageLocationReference savepointLocationReference = AbstractFsCheckpointStorage.encodePathAsReference((Path)new Path("file:///savepoint/"));
        FsCheckpointStorageLocation savepointStreamFactory = (FsCheckpointStorageLocation)storage.resolveCheckpointStorageLocation(2L, savepointLocationReference);
        FileSystem fileSystem = savepointStreamFactory.getFileSystem();
        Assert.assertTrue((boolean)(fileSystem instanceof LocalFileSystem));
    }

    private void assertParent(Path parent, Path child) {
        Path path = new Path(parent, child.getName());
        Assert.assertEquals((Object)path, (Object)child);
    }

    private static final class TestingPath
    extends Path {
        private static final long serialVersionUID = 2560119808844230488L;
        @Nonnull
        private final transient FileSystem fileSystem;

        TestingPath(String pathString, @Nonnull FileSystem fileSystem) {
            super(pathString);
            this.fileSystem = fileSystem;
        }

        public FileSystem getFileSystem() throws IOException {
            return this.fileSystem;
        }
    }
}

