/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class StateBackendLoadingTest {
    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    private final ClassLoader cl = this.getClass().getClassLoader();
    private final String backendKey = CoreOptions.STATE_BACKEND.key();

    @Test
    public void testNoStateBackendDefined() throws Exception {
        Assert.assertNull((Object)AbstractStateBackend.loadStateBackendFromConfig((Configuration)new Configuration(), (ClassLoader)this.cl, null));
    }

    @Test
    public void testInstantiateMemoryBackendByDefault() throws Exception {
        StateBackend backend = AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)new Configuration(), (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(backend instanceof MemoryStateBackend));
    }

    @Test
    public void testLoadMemoryStateBackend() throws Exception {
        Configuration config = new Configuration();
        config.setString(this.backendKey, "jobmanager");
        StateBackend backend = AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)new Configuration(), (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(backend instanceof MemoryStateBackend));
    }

    @Test
    public void testLoadFileSystemStateBackend() throws Exception {
        String checkpointDir = new Path(this.tmp.getRoot().toURI()).toString();
        Path expectedPath = new Path(checkpointDir);
        int threshold = 1000000;
        Configuration config1 = new Configuration();
        config1.setString(this.backendKey, "filesystem");
        config1.setString("state.checkpoints.dir", checkpointDir);
        config1.setString("state.backend.fs.checkpointdir", checkpointDir);
        config1.setInteger("state.backend.fs.memory-threshold", 1000000);
        Configuration config2 = new Configuration();
        config2.setString(this.backendKey, FsStateBackendFactory.class.getName());
        config2.setString("state.checkpoints.dir", checkpointDir);
        config2.setString("state.backend.fs.checkpointdir", checkpointDir);
        config2.setInteger("state.backend.fs.memory-threshold", 1000000);
        StateBackend backend1 = AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)config1, (ClassLoader)this.cl, null);
        StateBackend backend2 = AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)config2, (ClassLoader)this.cl, null);
        Assert.assertTrue((boolean)(backend1 instanceof FsStateBackend));
        Assert.assertTrue((boolean)(backend2 instanceof FsStateBackend));
        FsStateBackend fs1 = (FsStateBackend)backend1;
        FsStateBackend fs2 = (FsStateBackend)backend2;
        Assert.assertEquals((Object)expectedPath, (Object)fs1.getBasePath());
        Assert.assertEquals((Object)expectedPath, (Object)fs2.getBasePath());
        Assert.assertEquals((long)1000000L, (long)fs1.getMinFileSizeThreshold());
        Assert.assertEquals((long)1000000L, (long)fs2.getMinFileSizeThreshold());
    }

    @Test
    public void testLoadingFails() throws Exception {
        Configuration config = new Configuration();
        config.setString(this.backendKey, "does.not.exist");
        try {
            AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with an exception");
        }
        catch (DynamicCodeLoadingException dynamicCodeLoadingException) {
            // empty catch block
        }
        config.setString(this.backendKey, File.class.getName());
        try {
            AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with an exception");
        }
        catch (DynamicCodeLoadingException dynamicCodeLoadingException) {
            // empty catch block
        }
        config.setString(this.backendKey, FailingFactory.class.getName());
        try {
            AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with an exception");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    static final class FailingFactory
    implements StateBackendFactory<StateBackend> {
        private static final long serialVersionUID = 1L;

        FailingFactory() {
        }

        public StateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException {
            throw new IOException("fail!");
        }
    }
}

