/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobCachePutTest;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobKeyTest;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerConnection;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlobClientTest
extends TestLogger {
    private static final int TEST_BUFFER_SIZE = 17000;
    static BlobServer BLOB_SERVER;
    static Configuration clientConfig;
    @ClassRule
    public static TemporaryFolder temporaryFolder;

    @BeforeClass
    public static void startServer() throws IOException {
        Configuration config = new Configuration();
        config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        BLOB_SERVER = new BlobServer(config, (BlobStore)new VoidBlobStore());
        BLOB_SERVER.start();
        clientConfig = new Configuration();
    }

    @AfterClass
    public static void stopServer() throws IOException {
        if (BLOB_SERVER != null) {
            BLOB_SERVER.close();
        }
    }

    private static byte[] createTestBuffer() {
        byte[] buf = new byte[17000];
        for (int i = 0; i < buf.length; ++i) {
            buf[i] = (byte)(i % 128);
        }
        return buf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static byte[] prepareTestFile(File file) throws IOException {
        MessageDigest md = BlobUtils.createMessageDigest();
        byte[] buf = new byte[17000];
        for (int i = 0; i < buf.length; ++i) {
            buf[i] = (byte)(i % 128);
        }
        try (FileOutputStream fos = null;){
            fos = new FileOutputStream(file);
            for (int i = 0; i < 20; ++i) {
                fos.write(buf);
                md.update(buf);
            }
        }
        return md.digest();
    }

    static void validateGetAndClose(InputStream actualInputStream, byte[] expectedBuf) throws IOException {
        try {
            byte[] receivedBuffer = new byte[expectedBuf.length];
            int bytesReceived = 0;
            while (true) {
                int read;
                if ((read = actualInputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived)) < 0) {
                    throw new EOFException();
                }
                if ((bytesReceived += read) != receivedBuffer.length) continue;
                Assert.assertEquals((long)-1L, (long)actualInputStream.read());
                Assert.assertArrayEquals((byte[])expectedBuf, (byte[])receivedBuffer);
                return;
            }
        }
        finally {
            actualInputStream.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void validateGetAndClose(InputStream actualInputStream, InputStream expectedInputStream) throws IOException {
        try {
            int r1;
            do {
                r1 = actualInputStream.read();
                int r2 = expectedInputStream.read();
                Assert.assertEquals((long)r2, (long)r1);
            } while (r1 >= 0);
        }
        finally {
            actualInputStream.close();
            expectedInputStream.close();
        }
    }

    static void validateGetAndClose(InputStream actualInputStream, File expectedFile) throws IOException {
        BlobClientTest.validateGetAndClose(actualInputStream, new FileInputStream(expectedFile));
    }

    @Test
    public void testContentAddressableBufferTransientBlob() throws IOException, InterruptedException {
        this.testContentAddressableBuffer(BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testContentAddressableBufferPermantBlob() throws IOException, InterruptedException {
        this.testContentAddressableBuffer(BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testContentAddressableBuffer(BlobKey.BlobType blobType) throws IOException, InterruptedException {
        BlobClient client = null;
        try {
            Throwable throwable;
            InputStream ignored2;
            byte[] testBuffer = BlobClientTest.createTestBuffer();
            MessageDigest md = BlobUtils.createMessageDigest();
            md.update(testBuffer);
            byte[] digest = md.digest();
            InetSocketAddress serverAddress = new InetSocketAddress("localhost", this.getBlobServer().getPort());
            client = new BlobClient(serverAddress, this.getBlobClientConfig());
            JobID jobId = new JobID();
            BlobKey receivedKey1 = null;
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                receivedKey1 = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
                Assert.assertArrayEquals((byte[])digest, (byte[])receivedKey1.getHash());
            }
            BlobKey receivedKey2 = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
            Assert.assertArrayEquals((byte[])digest, (byte[])receivedKey2.getHash());
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobKeyTest.verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
            }
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobClientTest.validateGetAndClose(client.getInternal(null, receivedKey1), testBuffer);
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), null, receivedKey1);
            }
            BlobClientTest.validateGetAndClose(client.getInternal(jobId, receivedKey2), testBuffer);
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), jobId, receivedKey2);
            }
            try {
                ignored2 = client.getInternal(null, BlobKey.createKey((BlobKey.BlobType)blobType));
                throwable = null;
                try {
                    Assert.fail((String)"Expected IOException did not occur");
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (ignored2 != null) {
                        if (throwable != null) {
                            try {
                                ignored2.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                        } else {
                            ignored2.close();
                        }
                    }
                }
            }
            catch (IOException ignored2) {
                // empty catch block
            }
            client = new BlobClient(serverAddress, this.getBlobClientConfig());
            try {
                ignored2 = client.getInternal(jobId, BlobKey.createKey((BlobKey.BlobType)blobType));
                throwable = null;
                try {
                    Assert.fail((String)"Expected IOException did not occur");
                }
                catch (Throwable throwable4) {
                    throwable = throwable4;
                    throw throwable4;
                }
                finally {
                    if (ignored2 != null) {
                        if (throwable != null) {
                            try {
                                ignored2.close();
                            }
                            catch (Throwable throwable5) {
                                throwable.addSuppressed(throwable5);
                            }
                        } else {
                            ignored2.close();
                        }
                    }
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        finally {
            if (client != null) {
                try {
                    client.close();
                }
                catch (Throwable throwable) {}
            }
        }
    }

    protected Configuration getBlobClientConfig() {
        return clientConfig;
    }

    protected BlobServer getBlobServer() {
        return BLOB_SERVER;
    }

    @Test
    public void testContentAddressableStreamTransientBlob() throws IOException, InterruptedException {
        this.testContentAddressableStream(BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testContentAddressableStreamPermanentBlob() throws IOException, InterruptedException {
        this.testContentAddressableStream(BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testContentAddressableStream(BlobKey.BlobType blobType) throws IOException, InterruptedException {
        File testFile = temporaryFolder.newFile();
        byte[] digest = BlobClientTest.prepareTestFile(testFile);
        InputStream is = null;
        try (BlobClient client = new BlobClient(new InetSocketAddress("localhost", this.getBlobServer().getPort()), this.getBlobClientConfig());){
            JobID jobId = new JobID();
            BlobKey receivedKey1 = null;
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                is = new FileInputStream(testFile);
                receivedKey1 = client.putInputStream(null, is, blobType);
                Assert.assertArrayEquals((byte[])digest, (byte[])receivedKey1.getHash());
            }
            is = new FileInputStream(testFile);
            BlobKey receivedKey2 = client.putInputStream(jobId, is, blobType);
            is.close();
            is = null;
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobKeyTest.verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
                BlobClientTest.validateGetAndClose(client.getInternal(null, receivedKey1), testFile);
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), null, receivedKey1);
            }
            BlobClientTest.validateGetAndClose(client.getInternal(jobId, receivedKey2), testFile);
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), jobId, receivedKey2);
            }
        }
        finally {
            if (is != null) {
                try {
                    is.close();
                }
                catch (Throwable throwable) {}
            }
        }
    }

    @Test
    public void testGetFailsDuringStreamingNoJobTransientBlob() throws IOException {
        this.testGetFailsDuringStreaming(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testGetFailsDuringStreamingForJobTransientBlob() throws IOException {
        this.testGetFailsDuringStreaming(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testGetFailsDuringStreamingForJobPermanentBlob() throws IOException {
        this.testGetFailsDuringStreaming(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsDuringStreaming(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        try (BlobClient client = new BlobClient(new InetSocketAddress("localhost", this.getBlobServer().getPort()), this.getBlobClientConfig());){
            byte[] data = new byte[5000000];
            Random rnd = new Random();
            rnd.nextBytes(data);
            BlobKey key = client.putBuffer(jobId, data, 0, data.length, blobType);
            Assert.assertNotNull((Object)key);
            InputStream is = client.getInternal(jobId, key);
            byte[] receiveBuffer = new byte[data.length];
            int firstChunkLen = 50000;
            BlobUtils.readFully((InputStream)is, (byte[])receiveBuffer, (int)0, (int)firstChunkLen, null);
            BlobUtils.readFully((InputStream)is, (byte[])receiveBuffer, (int)firstChunkLen, (int)firstChunkLen, null);
            for (BlobServerConnection conn : this.getBlobServer().getCurrentActiveConnections()) {
                conn.close();
            }
            try {
                BlobUtils.readFully((InputStream)is, (byte[])receiveBuffer, (int)(2 * firstChunkLen), (int)(data.length - 2 * firstChunkLen), null);
                Assert.assertArrayEquals((byte[])data, (byte[])receiveBuffer);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    @Test
    public void testUploadJarFilesHelper() throws Exception {
        BlobClientTest.uploadJarFile(this.getBlobServer(), this.getBlobClientConfig());
    }

    static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
        File testFile = File.createTempFile("testfile", ".dat");
        testFile.deleteOnExit();
        BlobClientTest.prepareTestFile(testFile);
        InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
        BlobClientTest.uploadJarFile(serverAddress, blobClientConfig, testFile);
        BlobClientTest.uploadJarFile(serverAddress, blobClientConfig, testFile);
    }

    private static void uploadJarFile(InetSocketAddress serverAddress, Configuration blobClientConfig, File testFile) throws IOException {
        JobID jobId = new JobID();
        List blobKeys = BlobClient.uploadJarFiles((InetSocketAddress)serverAddress, (Configuration)blobClientConfig, (JobID)jobId, Collections.singletonList(new Path(testFile.toURI())));
        Assert.assertEquals((long)1L, (long)blobKeys.size());
        try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig);){
            BlobClientTest.validateGetAndClose(blobClient.getInternal(jobId, (BlobKey)blobKeys.get(0)), testFile);
        }
    }

    static {
        temporaryFolder = new TemporaryFolder();
    }
}

