/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.BindException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestStandbyCheckpoints {
    private static final int NUM_DIRS_IN_LOG = 200000;
    protected MiniDFSCluster cluster;
    protected NameNode nn0;
    protected NameNode nn1;
    protected FileSystem fs;
    private final Random random = new Random();
    protected File tmpOivImgDir;
    private static final Log LOG = LogFactory.getLog(TestStandbyCheckpoints.class);

    @Before
    public void setupCluster() throws Exception {
        Configuration conf = this.setupCommonConfig();
        conf.setInt("dfs.namenode.num.checkpoints.retained", 1);
        conf.setInt("dfs.namenode.num.extra.edits.retained", 0);
        int retryCount = 0;
        while (true) {
            try {
                int basePort = 10060 + this.random.nextInt(100) * 2;
                MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort)).addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1)));
                this.cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology).numDataNodes(1).build();
                this.cluster.waitActive();
                this.nn0 = this.cluster.getNameNode(0);
                this.nn1 = this.cluster.getNameNode(1);
                this.fs = HATestUtil.configureFailoverFs(this.cluster, conf);
                this.cluster.transitionToActive(0);
                ++retryCount;
            }
            catch (BindException e) {
                LOG.info((Object)("Set up MiniDFSCluster failed due to port conflicts, retry " + retryCount + " times"));
                continue;
            }
            break;
        }
    }

    protected Configuration setupCommonConfig() {
        this.tmpOivImgDir = GenericTestUtils.getTestDir((String)"TestStandbyCheckpoints");
        this.tmpOivImgDir.mkdirs();
        Configuration conf = new Configuration();
        conf.setInt("dfs.namenode.checkpoint.check.period", 1);
        conf.setInt("dfs.namenode.checkpoint.txns", 5);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        conf.set("dfs.namenode.legacy-oiv-image.dir", this.tmpOivImgDir.getAbsolutePath());
        conf.setBoolean("dfs.image.compress", true);
        conf.set("dfs.image.compression.codec", SlowCodec.class.getCanonicalName());
        CompressionCodecFactory.setCodecClasses((Configuration)conf, (List)ImmutableList.of(SlowCodec.class));
        return conf;
    }

    @After
    public void shutdownCluster() throws IOException {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
        if (this.tmpOivImgDir != null) {
            FileUtil.fullyDelete((File)this.tmpOivImgDir);
        }
    }

    @Test(timeout=300000L)
    public void testSBNCheckpoints() throws Exception {
        JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(this.nn1);
        this.doEdits(0, 10);
        HATestUtil.waitForStandbyToCatchUp(this.nn0, this.nn1);
        HATestUtil.waitForCheckpoint(this.cluster, 1, (List<Integer>)ImmutableList.of((Object)12));
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                if (TestStandbyCheckpoints.this.tmpOivImgDir.list().length > 0) {
                    return true;
                }
                return false;
            }
        }, (int)1000, (int)60000);
        Assert.assertEquals((String)"One file is expected", (long)1L, (long)this.tmpOivImgDir.list().length);
        HATestUtil.waitForCheckpoint(this.cluster, 0, (List<Integer>)ImmutableList.of((Object)12));
        ((JournalSet)Mockito.verify((Object)standbyJournalSet, (VerificationMode)Mockito.never())).purgeLogsOlderThan(Mockito.anyLong());
    }

    @Test
    public void testNewDirInitAfterCheckpointing() throws Exception {
        File hdfsDir = new File(PathUtils.getTestDir(TestStandbyCheckpoints.class), "testNewDirInitAfterCheckpointing");
        File nameDir = new File(hdfsDir, "name1");
        assert (nameDir.mkdirs());
        String existingDir = this.cluster.getConfiguration(0).get("dfs.namenode.name.dir");
        this.cluster.getConfiguration(0).set("dfs.namenode.name.dir", existingDir + "," + Util.fileAsURI((File)nameDir).toString());
        this.cluster.restartNameNode(0);
        this.nn0 = this.cluster.getNameNode(0);
        this.cluster.transitionToActive(0);
        File currDir = new File(nameDir, "current");
        File versionFile = new File(currDir, "VERSION");
        assert (currDir.exists());
        assert (!versionFile.exists());
        this.doEdits(0, 10);
        HATestUtil.waitForStandbyToCatchUp(this.nn0, this.nn1);
        for (int i = 0; i < 20 && !versionFile.exists(); ++i) {
            Thread.sleep(500L);
        }
        assert (versionFile.exists());
    }

    @Test(timeout=300000L)
    public void testBothNodesInStandbyState() throws Exception {
        this.doEdits(0, 10);
        this.cluster.transitionToStandby(0);
        HATestUtil.waitForCheckpoint(this.cluster, 1, (List<Integer>)ImmutableList.of((Object)12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, (List<Integer>)ImmutableList.of((Object)12));
        Assert.assertEquals((long)12L, (long)this.nn0.getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        Assert.assertEquals((long)12L, (long)this.nn1.getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        ArrayList dirs = Lists.newArrayList();
        dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 0));
        dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 1));
        FSImageTestUtil.assertParallelFilesAreIdentical(dirs, (Set<String>)ImmutableSet.of());
    }

    @Test(timeout=300000L)
    public void testCheckpointWhenNoNewTransactionsHappened() throws Exception {
        this.cluster.getConfiguration(1).setInt("dfs.namenode.checkpoint.period", 0);
        this.cluster.restartNameNode(1);
        this.nn1 = this.cluster.getNameNode(1);
        FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(this.nn1);
        Thread.sleep(1000L);
        ((FSImage)Mockito.verify((Object)spyImage1, (VerificationMode)Mockito.never())).saveNamespace((FSNamesystem)Mockito.anyObject());
        HATestUtil.waitForStandbyToCatchUp(this.nn0, this.nn1);
        Thread.sleep(2000L);
        ((FSImage)Mockito.verify((Object)spyImage1, (VerificationMode)Mockito.times((int)1))).saveNamespace((FSNamesystem)Mockito.anyObject(), (NNStorage.NameNodeFile)Mockito.eq((Object)NNStorage.NameNodeFile.IMAGE), (Canceler)Mockito.anyObject());
    }

    @Test(timeout=120000L)
    public void testCheckpointCancellation() throws Exception {
        this.cluster.transitionToStandby(0);
        URI sharedUri = this.cluster.getSharedEditsDir(0, 1);
        File sharedDir = new File(sharedUri.getPath(), "current");
        File tmpDir = new File(MiniDFSCluster.getBaseDirectory(), "testCheckpointCancellation-tmp");
        FSNamesystem fsn = this.cluster.getNamesystem(0);
        FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, 200000, 3L, fsn.getFSDirectory().getLastInodeId() + 1L);
        String fname = NNStorage.getInProgressEditsFileName((long)3L);
        new File(tmpDir, fname).renameTo(new File(sharedDir, fname));
        this.cluster.getConfiguration(1).setInt("dfs.namenode.checkpoint.period", 0);
        this.cluster.restartNameNode(1);
        this.nn1 = this.cluster.getNameNode(1);
        this.cluster.transitionToActive(0);
        boolean canceledOne = false;
        for (int i = 0; i < 10 && !canceledOne; ++i) {
            this.doEdits(i * 10, i * 10 + 10);
            this.cluster.transitionToStandby(0);
            this.cluster.transitionToActive(1);
            this.cluster.transitionToStandby(1);
            this.cluster.transitionToActive(0);
            canceledOne = StandbyCheckpointer.getCanceledCount() > 0;
        }
        Assert.assertTrue((boolean)canceledOne);
    }

    @Test(timeout=60000L)
    public void testCheckpointCancellationDuringUpload() throws Exception {
        this.cluster.getConfiguration(0).setInt("dfs.namenode.checkpoint.txns", 1000);
        this.cluster.getConfiguration(0).setBoolean("dfs.image.compress", false);
        this.cluster.getConfiguration(1).setBoolean("dfs.image.compress", false);
        this.cluster.getConfiguration(1).setLong("dfs.image.transfer.bandwidthPerSec", 100L);
        this.cluster.restartNameNode(0);
        this.cluster.restartNameNode(1);
        this.nn0 = this.cluster.getNameNode(0);
        this.nn1 = this.cluster.getNameNode(1);
        this.cluster.transitionToActive(0);
        this.doEdits(0, 100);
        HATestUtil.waitForStandbyToCatchUp(this.nn0, this.nn1);
        HATestUtil.waitForCheckpoint(this.cluster, 1, (List<Integer>)ImmutableList.of((Object)104));
        this.cluster.transitionToStandby(0);
        this.cluster.transitionToActive(1);
        this.cluster.shutdown();
        this.cluster = null;
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                ThreadInfo[] threads;
                ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
                for (ThreadInfo thread : threads = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 1)) {
                    if (!thread.getThreadName().startsWith("TransferFsImageUpload")) continue;
                    return false;
                }
                return true;
            }
        }, (int)1000, (int)30000);
        Assert.assertEquals((long)0L, (long)this.nn0.getFSImage().getMostRecentCheckpointTxId());
    }

    @Test(timeout=300000L)
    public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
        FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(this.nn1);
        GenericTestUtils.DelayAnswer answerer = new GenericTestUtils.DelayAnswer(LOG);
        ((FSImage)Mockito.doAnswer((Answer)answerer).when((Object)spyImage1)).saveNamespace((FSNamesystem)Mockito.any(FSNamesystem.class), (NNStorage.NameNodeFile)Mockito.eq((Object)NNStorage.NameNodeFile.IMAGE), (Canceler)Mockito.any(Canceler.class));
        this.doEdits(0, 1000);
        this.nn0.getRpcServer().rollEditLog();
        answerer.waitForCall();
        Assert.assertTrue((String)"SBN is not performing checkpoint but it should be.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 0 ? 1 : 0) != 0);
        ThreadUtil.sleepAtLeastIgnoreInterrupts((long)1000L);
        try {
            this.nn1.getRpcServer().getFileInfo("/");
            Assert.fail((String)"Should have thrown StandbyException, but instead succeeded.");
        }
        catch (StandbyException se) {
            GenericTestUtils.assertExceptionContains((String)"is not supported", (Throwable)se);
        }
        Assert.assertEquals((long)0L, (long)this.cluster.getNamesystem(1).getPendingDataNodeMessageCount());
        this.doCreate();
        Thread.sleep(1000L);
        Assert.assertTrue((this.cluster.getNamesystem(1).getPendingDataNodeMessageCount() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"SBN should have still been checkpointing.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 0 ? 1 : 0) != 0);
        answerer.proceed();
        answerer.waitForResult();
        Assert.assertTrue((String)"SBN should have finished checkpointing.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 1 ? 1 : 0) != 0);
    }

    @Test(timeout=300000L)
    public void testReadsAllowedDuringCheckpoint() throws Exception {
        FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(this.nn1);
        GenericTestUtils.DelayAnswer answerer = new GenericTestUtils.DelayAnswer(LOG);
        ((FSImage)Mockito.doAnswer((Answer)answerer).when((Object)spyImage1)).saveNamespace((FSNamesystem)Mockito.any(FSNamesystem.class), (NNStorage.NameNodeFile)Mockito.any(NNStorage.NameNodeFile.class), (Canceler)Mockito.any(Canceler.class));
        this.doEdits(0, 1000);
        this.nn0.getRpcServer().rollEditLog();
        answerer.waitForCall();
        Assert.assertTrue((String)"SBN is not performing checkpoint but it should be.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 0 ? 1 : 0) != 0);
        ThreadUtil.sleepAtLeastIgnoreInterrupts((long)1000L);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    TestStandbyCheckpoints.this.nn1.getRpcServer().restoreFailedStorage("false");
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
        t.start();
        ThreadUtil.sleepAtLeastIgnoreInterrupts((long)1000L);
        Assert.assertFalse((boolean)this.nn1.getNamesystem().getFsLockForTests().hasQueuedThreads());
        Assert.assertFalse((boolean)this.nn1.getNamesystem().getFsLockForTests().isWriteLocked());
        Assert.assertTrue((boolean)this.nn1.getNamesystem().getCpLockForTests().hasQueuedThreads());
        String pageContents = DFSTestUtil.urlGet(new URL("http://" + this.nn1.getHttpAddress().getHostName() + ":" + this.nn1.getHttpAddress().getPort() + "/jmx"));
        Assert.assertTrue((boolean)pageContents.contains("NumLiveDataNodes"));
        Assert.assertTrue((String)"SBN should have still been checkpointing.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 0 ? 1 : 0) != 0);
        answerer.proceed();
        answerer.waitForResult();
        Assert.assertTrue((String)"SBN should have finished checkpointing.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 1 ? 1 : 0) != 0);
        t.join();
    }

    @Test(timeout=300000L)
    public void testCheckpointSucceedsWithLegacyOIVException() throws Exception {
        FileUtil.fullyDelete((File)this.tmpOivImgDir);
        this.doEdits(0, 10);
        HATestUtil.waitForStandbyToCatchUp(this.nn0, this.nn1);
        HATestUtil.waitForCheckpoint(this.cluster, 1, (List<Integer>)ImmutableList.of((Object)12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, (List<Integer>)ImmutableList.of((Object)12));
    }

    private void doEdits(int start, int stop) throws IOException {
        for (int i = start; i < stop; ++i) {
            Path p = new Path("/test" + i);
            this.fs.mkdirs(p);
        }
    }

    private void doCreate() throws IOException {
        Path p = new Path("/testFile");
        this.fs.delete(p, false);
        FSDataOutputStream out = this.fs.create(p, (short)1);
        out.write(42);
        out.close();
    }

    public static class SlowCodec
    extends GzipCodec {
        public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
            CompressionOutputStream ret = super.createOutputStream(out);
            CompressionOutputStream spy = (CompressionOutputStream)Mockito.spy((Object)ret);
            ((CompressionOutputStream)Mockito.doAnswer((Answer)new GenericTestUtils.SleepAnswer(5)).when((Object)spy)).write((byte[])Mockito.any(), Mockito.anyInt(), Mockito.anyInt());
            return spy;
        }
    }
}

