/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCachePutTest;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerGetTest;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlobServerCleanupTest
extends TestLogger {
    private final Random rnd = new Random();
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testTransientBlobNoJobCleanup() throws IOException, InterruptedException, ExecutionException {
        this.testTransientBlobCleanup(null);
    }

    @Test
    public void testTransientBlobForJobCleanup() throws IOException, InterruptedException, ExecutionException {
        this.testTransientBlobCleanup(new JobID());
    }

    private void testTransientBlobCleanup(@Nullable JobID jobId) throws IOException, InterruptedException, ExecutionException {
        long cleanupInterval = 1L;
        int numberConcurrentGetOperations = 3;
        ArrayList<CompletableFuture<Void>> getOperations = new ArrayList<CompletableFuture<Void>>(3);
        byte[] data = new byte[2000000];
        this.rnd.nextBytes(data);
        byte[] data2 = Arrays.copyOfRange(data, 10, 54);
        Configuration config = new Configuration();
        config.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
        try (BlobServer server = new BlobServer(config, (BlobStore)new VoidBlobStore());){
            ConcurrentMap transientBlobExpiryTimes = server.getBlobExpiryTimes();
            server.start();
            long cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            TransientBlobKey key1 = (TransientBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.TRANSIENT_BLOB);
            Long key1ExpiryAfterPut = (Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1));
            Assert.assertThat((Object)key1ExpiryAfterPut, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
            cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            TransientBlobKey key2 = (TransientBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data2, BlobKey.BlobType.TRANSIENT_BLOB);
            Long key2ExpiryAfterPut = (Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2));
            Assert.assertThat((Object)key2ExpiryAfterPut, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
            JobID jobIdHA = jobId == null ? new JobID() : jobId;
            BlobKey keyHA = BlobServerPutTest.put((BlobService)server, jobIdHA, data, BlobKey.BlobType.PERMANENT_BLOB);
            Thread.sleep(1L);
            cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            BlobServerPutTest.verifyContents((BlobService)server, jobId, (BlobKey)key1, data);
            Long key1ExpiryAfterGet = (Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1));
            Assert.assertThat((Object)key1ExpiryAfterGet, (Matcher)Matchers.greaterThan((Comparable)key1ExpiryAfterPut));
            Assert.assertThat((Object)key1ExpiryAfterGet, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
            Assert.assertEquals((Object)key2ExpiryAfterPut, transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)));
            Thread.sleep(1L);
            cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            BlobServerPutTest.verifyContents((BlobService)server, jobId, (BlobKey)key2, data2);
            Assert.assertEquals((Object)key1ExpiryAfterGet, transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1)));
            Assert.assertThat(transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)), (Matcher)Matchers.greaterThan((Comparable)key2ExpiryAfterPut));
            Assert.assertThat(transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
            long finishTime = System.currentTimeMillis() + 3L * cleanupInterval;
            ExecutorService executor = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 3; ++i) {
                CompletableFuture<Void> getOperation = CompletableFuture.supplyAsync(() -> {
                    try {
                        while (System.currentTimeMillis() < finishTime) {
                            BlobServerGetTest.get((BlobService)server, jobId, (BlobKey)key1);
                        }
                        return null;
                    }
                    catch (IOException e) {
                        throw new CompletionException((Throwable)new FlinkException("Could not retrieve blob.", (Throwable)e));
                    }
                }, executor);
                getOperations.add(getOperation);
            }
            FutureUtils.ConjunctFuture filesFuture = FutureUtils.combineAll(getOperations);
            filesFuture.get();
            BlobCachePutTest.verifyDeletedEventually(server, jobId, new BlobKey[]{key1, key2});
            BlobServerPutTest.verifyContents((BlobService)server, jobIdHA, keyHA, data);
        }
    }

    public static <T> int checkFilesExist(JobID jobId, Collection<? extends BlobKey> keys, T blobService, boolean doThrow) throws IOException {
        int numFiles = 0;
        for (BlobKey blobKey : keys) {
            PermanentBlobCache cache;
            File blobFile;
            if (blobService instanceof BlobServer) {
                BlobServer server = (BlobServer)blobService;
                blobFile = server.getStorageLocation(jobId, blobKey);
            } else if (blobService instanceof PermanentBlobCache) {
                cache = (PermanentBlobCache)blobService;
                blobFile = cache.getStorageLocation(jobId, blobKey);
            } else if (blobService instanceof TransientBlobCache) {
                cache = (TransientBlobCache)blobService;
                blobFile = cache.getStorageLocation(jobId, blobKey);
            } else {
                throw new UnsupportedOperationException("unsupported BLOB service class: " + blobService.getClass().getCanonicalName());
            }
            if (blobFile.exists()) {
                ++numFiles;
                continue;
            }
            if (!doThrow) continue;
            throw new IOException("File " + blobFile + " does not exist.");
        }
        return numFiles;
    }

    public static void checkFileCountForJob(int expectedCount, JobID jobId, PermanentBlobService blobService) throws IOException {
        File jobDir;
        if (blobService instanceof BlobServer) {
            BlobServer server = (BlobServer)blobService;
            jobDir = server.getStorageLocation(jobId, (BlobKey)new PermanentBlobKey()).getParentFile();
        } else {
            PermanentBlobCache cache = (PermanentBlobCache)blobService;
            jobDir = cache.getStorageLocation(jobId, (BlobKey)new PermanentBlobKey()).getParentFile();
        }
        File[] blobsForJob = jobDir.listFiles();
        if (blobsForJob == null) {
            if (expectedCount != 0) {
                throw new IOException("File " + jobDir + " does not exist.");
            }
        } else {
            Assert.assertEquals((String)("Too many/few files in job dir: " + Arrays.asList(blobsForJob).toString()), (long)expectedCount, (long)blobsForJob.length);
        }
    }
}

