/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.taskmanager;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ImmediateEventExecutor;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public class AbstractTaskManagerFileHandlerTest
extends TestLogger {
    private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate();
    private static final DefaultFullHttpRequest HTTP_REQUEST = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/foobar");
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static BlobServer blobServer;
    private static HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> handlerRequest;
    private String fileContent1;
    private TransientBlobKey transientBlobKey1;
    private String fileContent2;
    private TransientBlobKey transientBlobKey2;

    @BeforeClass
    public static void setup() throws IOException, HandlerRequestException {
        Configuration configuration = new Configuration();
        configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        blobServer = new BlobServer(configuration, (BlobStore)new VoidBlobStore());
        handlerRequest = new HandlerRequest((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new TaskManagerMessageParameters(), Collections.singletonMap("taskmanagerid", EXPECTED_TASK_MANAGER_ID.getResourceIdString()), Collections.emptyMap());
    }

    @Before
    public void setupTest() throws IOException {
        this.fileContent1 = UUID.randomUUID().toString();
        File file1 = AbstractTaskManagerFileHandlerTest.createFileWithContent(this.fileContent1);
        this.transientBlobKey1 = AbstractTaskManagerFileHandlerTest.storeFileInBlobServer(file1);
        this.fileContent2 = UUID.randomUUID().toString();
        File file2 = AbstractTaskManagerFileHandlerTest.createFileWithContent(this.fileContent2);
        this.transientBlobKey2 = AbstractTaskManagerFileHandlerTest.storeFileInBlobServer(file2);
    }

    @AfterClass
    public static void teardown() throws IOException {
        if (blobServer != null) {
            blobServer.close();
            blobServer = null;
        }
    }

    @Test
    public void testFileServing() throws Exception {
        Time cacheEntryDuration = Time.milliseconds((long)1000L);
        ArrayDeque<CompletableFuture<TransientBlobKey>> requestFileUploads = new ArrayDeque<CompletableFuture<TransientBlobKey>>(1);
        requestFileUploads.add(CompletableFuture.completedFuture(this.transientBlobKey1));
        TestTaskManagerFileHandler testTaskManagerFileHandler = this.createTestTaskManagerFileHandler(cacheEntryDuration, requestFileUploads, EXPECTED_TASK_MANAGER_ID);
        File outputFile = temporaryFolder.newFile();
        TestingChannelHandlerContext testingContext = new TestingChannelHandlerContext(outputFile);
        testTaskManagerFileHandler.respondToRequest(testingContext, (HttpRequest)HTTP_REQUEST, handlerRequest, null);
        Assert.assertThat((Object)outputFile.length(), (Matcher)Matchers.is((Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L))));
        Assert.assertThat((Object)FileUtils.readFileUtf8((File)outputFile), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.fileContent1)));
    }

    @Test
    public void testFileCaching() throws Exception {
        File outputFile = this.runFileCachingTest(Time.milliseconds((long)5000L), Time.milliseconds((long)0L));
        Assert.assertThat((Object)outputFile.length(), (Matcher)Matchers.is((Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L))));
        Assert.assertThat((Object)FileUtils.readFileUtf8((File)outputFile), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.fileContent1)));
    }

    @Test
    public void testFileCacheExpiration() throws Exception {
        Time cacheEntryDuration = Time.milliseconds((long)5L);
        File outputFile = this.runFileCachingTest(cacheEntryDuration, cacheEntryDuration);
        Assert.assertThat((Object)outputFile.length(), (Matcher)Matchers.is((Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L))));
        Assert.assertThat((Object)FileUtils.readFileUtf8((File)outputFile), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.fileContent2)));
    }

    private File runFileCachingTest(Time cacheEntryDuration, Time delayBetweenRequests) throws Exception {
        ArrayDeque<CompletableFuture<TransientBlobKey>> requestFileUploads = new ArrayDeque<CompletableFuture<TransientBlobKey>>(2);
        requestFileUploads.add(CompletableFuture.completedFuture(this.transientBlobKey1));
        requestFileUploads.add(CompletableFuture.completedFuture(this.transientBlobKey2));
        TestTaskManagerFileHandler testTaskManagerFileHandler = this.createTestTaskManagerFileHandler(cacheEntryDuration, requestFileUploads, EXPECTED_TASK_MANAGER_ID);
        File outputFile = temporaryFolder.newFile();
        TestingChannelHandlerContext testingContext = new TestingChannelHandlerContext(outputFile);
        testTaskManagerFileHandler.respondToRequest(testingContext, (HttpRequest)HTTP_REQUEST, handlerRequest, null);
        Thread.sleep(delayBetweenRequests.toMilliseconds());
        testTaskManagerFileHandler.respondToRequest(testingContext, (HttpRequest)HTTP_REQUEST, handlerRequest, null);
        return outputFile;
    }

    private TestTaskManagerFileHandler createTestTaskManagerFileHandler(Time cacheEntryDuration, Queue<CompletableFuture<TransientBlobKey>> requestFileUploads, ResourceID expectedTaskManagerId) {
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        return new TestTaskManagerFileHandler(CompletableFuture.completedFuture("localhost"), (GatewayRetriever<? extends RestfulGateway>)((GatewayRetriever)() -> CompletableFuture.completedFuture(null)), TestingUtils.infiniteTime(), Collections.emptyMap(), new TestUntypedMessageHeaders(), (GatewayRetriever<ResourceManagerGateway>)((GatewayRetriever)() -> CompletableFuture.completedFuture(resourceManagerGateway)), (TransientBlobService)blobServer, cacheEntryDuration, requestFileUploads, expectedTaskManagerId);
    }

    private static File createFileWithContent(String fileContent) throws IOException {
        File file = temporaryFolder.newFile();
        try (FileOutputStream fileOutputStream = new FileOutputStream(file);){
            fileOutputStream.write(fileContent.getBytes("UTF-8"));
        }
        return file;
    }

    private static TransientBlobKey storeFileInBlobServer(File fileToStore) throws IOException {
        try (FileInputStream fileInputStream = new FileInputStream(fileToStore);){
            TransientBlobKey transientBlobKey = blobServer.getTransientBlobService().putTransient((InputStream)fileInputStream);
            return transientBlobKey;
        }
    }

    private static final class TestUntypedMessageHeaders
    implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {
        private static final String URL = "/foobar";

        private TestUntypedMessageHeaders() {
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public TaskManagerMessageParameters getUnresolvedMessageParameters() {
            return new TaskManagerMessageParameters();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return URL;
        }
    }

    private static final class TestingChannelHandlerContext
    implements ChannelHandlerContext {
        final File outputFile;

        private TestingChannelHandlerContext(File outputFile) {
            this.outputFile = (File)Preconditions.checkNotNull((Object)outputFile);
        }

        public ChannelFuture write(Object msg, ChannelPromise promise) {
            if (msg instanceof DefaultFileRegion) {
                DefaultFileRegion defaultFileRegion = (DefaultFileRegion)msg;
                try (FileOutputStream fileOutputStream = new FileOutputStream(this.outputFile);){
                    fileOutputStream.getChannel();
                    defaultFileRegion.transferTo((WritableByteChannel)fileOutputStream.getChannel(), 0L);
                }
                catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
            return new DefaultChannelPromise((Channel)new EmbeddedChannel());
        }

        public EventExecutor executor() {
            return ImmediateEventExecutor.INSTANCE;
        }

        public ChannelFuture write(Object msg) {
            return this.write(msg, null);
        }

        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            ChannelFuture channelFuture = this.write(msg, promise);
            this.flush();
            return channelFuture;
        }

        public ChannelFuture writeAndFlush(Object msg) {
            return this.writeAndFlush(msg, null);
        }

        public ChannelPipeline pipeline() {
            return (ChannelPipeline)Mockito.mock(ChannelPipeline.class);
        }

        public Channel channel() {
            return null;
        }

        public String name() {
            return null;
        }

        public ChannelHandler handler() {
            return null;
        }

        public boolean isRemoved() {
            return false;
        }

        public ChannelHandlerContext fireChannelRegistered() {
            return null;
        }

        public ChannelHandlerContext fireChannelUnregistered() {
            return null;
        }

        public ChannelHandlerContext fireChannelActive() {
            return null;
        }

        public ChannelHandlerContext fireChannelInactive() {
            return null;
        }

        public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
            return null;
        }

        public ChannelHandlerContext fireUserEventTriggered(Object event) {
            return null;
        }

        public ChannelHandlerContext fireChannelRead(Object msg) {
            return null;
        }

        public ChannelHandlerContext fireChannelReadComplete() {
            return null;
        }

        public ChannelHandlerContext fireChannelWritabilityChanged() {
            return null;
        }

        public ChannelFuture bind(SocketAddress localAddress) {
            return null;
        }

        public ChannelFuture connect(SocketAddress remoteAddress) {
            return null;
        }

        public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
            return null;
        }

        public ChannelFuture disconnect() {
            return null;
        }

        public ChannelFuture close() {
            return null;
        }

        public ChannelFuture deregister() {
            return null;
        }

        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            return null;
        }

        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return null;
        }

        public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
            return null;
        }

        public ChannelFuture disconnect(ChannelPromise promise) {
            return null;
        }

        public ChannelFuture close(ChannelPromise promise) {
            return null;
        }

        public ChannelFuture deregister(ChannelPromise promise) {
            return null;
        }

        public ChannelHandlerContext read() {
            return null;
        }

        public ChannelHandlerContext flush() {
            return null;
        }

        public ByteBufAllocator alloc() {
            return null;
        }

        public ChannelPromise newPromise() {
            return null;
        }

        public ChannelProgressivePromise newProgressivePromise() {
            return null;
        }

        public ChannelFuture newSucceededFuture() {
            return null;
        }

        public ChannelFuture newFailedFuture(Throwable cause) {
            return null;
        }

        public ChannelPromise voidPromise() {
            return null;
        }

        public <T> Attribute<T> attr(AttributeKey<T> key) {
            return null;
        }

        public <T> boolean hasAttr(AttributeKey<T> attributeKey) {
            return false;
        }
    }

    private static final class TestTaskManagerFileHandler
    extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
        private final Queue<CompletableFuture<TransientBlobKey>> requestFileUploads;
        private final ResourceID expectedTaskManagerId;

        protected TestTaskManagerFileHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration, Queue<CompletableFuture<TransientBlobKey>> requestFileUploads, ResourceID expectedTaskManagerId) {
            super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
            this.requestFileUploads = (Queue)Preconditions.checkNotNull(requestFileUploads);
            this.expectedTaskManagerId = (ResourceID)Preconditions.checkNotNull((Object)expectedTaskManagerId);
        }

        protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
            Assert.assertThat((Object)taskManagerResourceId, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.expectedTaskManagerId)));
            CompletableFuture<TransientBlobKey> transientBlobKeyFuture = this.requestFileUploads.poll();
            if (transientBlobKeyFuture != null) {
                return transientBlobKeyFuture;
            }
            return FutureUtils.completedExceptionally((Throwable)new FlinkException("Could not upload file."));
        }
    }
}

