/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.rest.ConnectionClosedException;
import org.apache.flink.runtime.rest.ConnectionException;
import org.apache.flink.runtime.rest.ConnectionIdleException;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.TreeNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestClient
implements AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
    private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
    private final Executor executor;
    private final Bootstrap bootstrap;
    private final CompletableFuture<Void> terminationFuture;
    private final AtomicBoolean isRunning = new AtomicBoolean(true);

    public RestClient(final RestClientConfiguration configuration, Executor executor) {
        Preconditions.checkNotNull((Object)configuration);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.terminationFuture = new CompletableFuture();
        final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
        ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel socketChannel) {
                try {
                    if (sslHandlerFactory != null) {
                        socketChannel.pipeline().addLast("ssl", (ChannelHandler)sslHandlerFactory.createNettySSLHandler());
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(configuration.getMaxContentLength())}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(new ChannelHandler[]{new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS)}).addLast(new ChannelHandler[]{new ClientHandler()});
                }
                catch (Throwable t) {
                    t.printStackTrace();
                    ExceptionUtils.rethrow((Throwable)t);
                }
            }
        };
        NioEventLoopGroup group = new NioEventLoopGroup(1, (ThreadFactory)new ExecutorThreadFactory("flink-rest-client-netty"));
        this.bootstrap = new Bootstrap();
        ((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(configuration.getConnectionTimeout()))).group((EventLoopGroup)group)).channel(NioSocketChannel.class)).handler((ChannelHandler)initializer);
        LOG.info("Rest client endpoint started.");
    }

    public CompletableFuture<Void> closeAsync() {
        return this.shutdownInternally(Time.seconds((long)10L));
    }

    public void shutdown(Time timeout) {
        CompletableFuture<Void> shutDownFuture = this.shutdownInternally(timeout);
        try {
            shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            LOG.info("Rest endpoint shutdown complete.");
        }
        catch (Exception e) {
            LOG.warn("Rest endpoint shutdown failed.", (Throwable)e);
        }
    }

    private CompletableFuture<Void> shutdownInternally(Time timeout) {
        if (this.isRunning.compareAndSet(true, false)) {
            LOG.info("Shutting down rest endpoint.");
            if (this.bootstrap != null && this.bootstrap.group() != null) {
                this.bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(finished -> {
                    if (finished.isSuccess()) {
                        this.terminationFuture.complete(null);
                    } else {
                        this.terminationFuture.completeExceptionally(finished.cause());
                    }
                });
            }
        }
        return this.terminationFuture;
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
        return this.sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, Collections.emptyList());
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request, Collection<FileUpload> fileUploads) throws IOException {
        return this.sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, fileUploads, RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request, Collection<FileUpload> fileUploads, RestAPIVersion apiVersion) throws IOException {
        Preconditions.checkNotNull((Object)targetAddress);
        Preconditions.checkArgument((0 <= targetPort && targetPort < 65536 ? 1 : 0) != 0, (Object)("The target port " + targetPort + " is not in the range (0, 65536]."));
        Preconditions.checkNotNull(messageHeaders);
        Preconditions.checkNotNull(request);
        Preconditions.checkNotNull(messageParameters);
        Preconditions.checkNotNull(fileUploads);
        Preconditions.checkState((boolean)messageParameters.isResolved(), (Object)"Message parameters were not resolved.");
        if (!messageHeaders.getSupportedAPIVersions().contains((Object)apiVersion)) {
            throw new IllegalArgumentException(String.format("The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.", new Object[]{apiVersion, messageHeaders.getHttpMethod(), messageHeaders.getTargetRestEndpointURL(), messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))}));
        }
        String versionedHandlerURL = "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
        String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
        LOG.debug("Sending request of class {} to {}:{}{}", new Object[]{request.getClass(), targetAddress, targetPort, targetUrl});
        StringWriter sw = new StringWriter();
        objectMapper.writeValue((Writer)sw, request);
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
        Request httpRequest = RestClient.createRequest(targetAddress + ':' + targetPort, targetUrl, messageHeaders.getHttpMethod().getNettyHttpMethod(), payload, fileUploads);
        Collection<Class<?>> typeParameters = messageHeaders.getResponseTypeParameters();
        JavaType responseType = typeParameters.isEmpty() ? objectMapper.constructType(messageHeaders.getResponseClass()) : objectMapper.getTypeFactory().constructParametricType(messageHeaders.getResponseClass(), typeParameters.toArray(new Class[typeParameters.size()]));
        return this.submitRequest(targetAddress, targetPort, httpRequest, responseType);
    }

    private static Request createRequest(String targetAddress, String targetUrl, HttpMethod httpMethod, ByteBuf jsonPayload, Collection<FileUpload> fileUploads) throws IOException {
        HttpPostRequestEncoder bodyRequestEncoder;
        if (fileUploads.isEmpty()) {
            DefaultFullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl, jsonPayload);
            httpRequest.headers().set("Host", (Object)targetAddress).set("Connection", (Object)"close").add("Content-Length", (Object)jsonPayload.capacity()).add("Content-Type", (Object)RestConstants.REST_CONTENT_TYPE);
            return new SimpleRequest((HttpRequest)httpRequest);
        }
        DefaultFullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl);
        httpRequest.headers().set("Host", (Object)targetAddress).set("Connection", (Object)"close");
        try {
            DefaultHttpDataFactory httpDataFactory = new DefaultHttpDataFactory(true);
            bodyRequestEncoder = new HttpPostRequestEncoder((HttpDataFactory)httpDataFactory, (HttpRequest)httpRequest, true);
            MemoryAttribute requestAttribute = new MemoryAttribute("request");
            requestAttribute.setContent(jsonPayload);
            bodyRequestEncoder.addBodyHttpData((InterfaceHttpData)requestAttribute);
            int fileIndex = 0;
            for (FileUpload fileUpload : fileUploads) {
                Path path = fileUpload.getFile();
                if (Files.isDirectory(path, new LinkOption[0])) {
                    throw new IllegalArgumentException("Upload of directories is not supported. Dir=" + path);
                }
                File file = path.toFile();
                LOG.trace("Adding file {} to request.", (Object)file);
                bodyRequestEncoder.addBodyFileUpload("file_" + fileIndex, file, fileUpload.getContentType(), false);
                ++fileIndex;
            }
        }
        catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
            throw new IOException("Could not encode request.", e);
        }
        try {
            httpRequest = bodyRequestEncoder.finalizeRequest();
        }
        catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
            throw new IOException("Could not finalize request.", e);
        }
        return new MultipartRequest((HttpRequest)httpRequest, bodyRequestEncoder);
    }

    private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, Request httpRequest, JavaType responseType) {
        ChannelFuture connectFuture = this.bootstrap.connect(targetAddress, targetPort);
        CompletableFuture channelFuture = new CompletableFuture();
        connectFuture.addListener(future -> {
            if (future.isSuccess()) {
                channelFuture.complete(future.channel());
            } else {
                channelFuture.completeExceptionally(future.cause());
            }
        });
        return ((CompletableFuture)channelFuture.thenComposeAsync(channel -> {
            CompletableFuture<JsonResponse> future;
            ClientHandler handler = (ClientHandler)channel.pipeline().get(ClientHandler.class);
            boolean success = false;
            try {
                if (handler == null) {
                    throw new IOException("Netty pipeline was not properly initialized.");
                }
                httpRequest.writeTo((Channel)channel);
                future = handler.getJsonFuture();
                success = true;
            }
            catch (IOException e) {
                future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e));
            }
            finally {
                if (!success) {
                    channel.close();
                }
            }
            return future;
        }, this.executor)).thenComposeAsync(rawResponse -> RestClient.parseResponse(rawResponse, responseType), this.executor);
    }

    private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse rawResponse, JavaType responseType) {
        CompletableFuture<ResponseBody> responseFuture = new CompletableFuture<ResponseBody>();
        JsonParser jsonParser = objectMapper.treeAsTokens((TreeNode)rawResponse.json);
        try {
            ResponseBody response = (ResponseBody)objectMapper.readValue(jsonParser, responseType);
            responseFuture.complete(response);
        }
        catch (IOException originalException) {
            try {
                ErrorResponseBody error = (ErrorResponseBody)objectMapper.treeToValue((TreeNode)rawResponse.getJson(), ErrorResponseBody.class);
                responseFuture.completeExceptionally((Throwable)((Object)new RestClientException(error.errors.toString(), rawResponse.getHttpResponseStatus())));
            }
            catch (JsonProcessingException jpe2) {
                LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", new Object[]{responseType, rawResponse, jpe2});
                responseFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response was neither of the expected type(" + responseType + ") nor an error.", originalException, rawResponse.getHttpResponseStatus())));
            }
        }
        return responseFuture;
    }

    private static final class JsonResponse {
        private final JsonNode json;
        private final HttpResponseStatus httpResponseStatus;

        private JsonResponse(JsonNode json, HttpResponseStatus httpResponseStatus) {
            this.json = (JsonNode)Preconditions.checkNotNull((Object)json);
            this.httpResponseStatus = (HttpResponseStatus)Preconditions.checkNotNull((Object)httpResponseStatus);
        }

        public JsonNode getJson() {
            return this.json;
        }

        public HttpResponseStatus getHttpResponseStatus() {
            return this.httpResponseStatus;
        }

        public String toString() {
            return "JsonResponse{json=" + this.json + ", httpResponseStatus=" + this.httpResponseStatus + '}';
        }
    }

    private static class ClientHandler
    extends SimpleChannelInboundHandler<Object> {
        private final CompletableFuture<JsonResponse> jsonFuture = new CompletableFuture();

        private ClientHandler() {
        }

        CompletableFuture<JsonResponse> getJsonFuture() {
            return this.jsonFuture;
        }

        protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof HttpResponse && ((HttpResponse)msg).status().equals((Object)HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE)) {
                this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException(String.format(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE + ". Try to raise [%s]", RestOptions.CLIENT_MAX_CONTENT_LENGTH.key()), ((HttpResponse)msg).status())));
            } else if (msg instanceof FullHttpResponse) {
                this.readRawResponse((FullHttpResponse)msg);
            } else {
                LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
                if (msg instanceof HttpResponse) {
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", ((HttpResponse)msg).getStatus())));
                } else {
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", HttpResponseStatus.INTERNAL_SERVER_ERROR)));
                }
            }
            ctx.close();
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            this.jsonFuture.completeExceptionally(new ConnectionClosedException("Channel became inactive."));
            ctx.close();
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                this.jsonFuture.completeExceptionally(new ConnectionIdleException("Channel became idle."));
                ctx.close();
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            if (cause instanceof TooLongFrameException) {
                this.jsonFuture.completeExceptionally(new TooLongFrameException(String.format(cause.getMessage() + " Try to raise [%s]", RestOptions.CLIENT_MAX_CONTENT_LENGTH.key())));
            } else {
                this.jsonFuture.completeExceptionally(cause);
            }
            ctx.close();
        }

        private void readRawResponse(FullHttpResponse msg) {
            JsonNode rawResponse;
            ByteBuf content = msg.content();
            try (ByteBufInputStream in = new ByteBufInputStream(content);){
                rawResponse = objectMapper.readTree((InputStream)in);
                LOG.debug("Received response {}.", (Object)rawResponse);
            }
            catch (JsonProcessingException je) {
                LOG.error("Response was not valid JSON.", (Throwable)je);
                content.readerIndex(0);
                try (ByteBufInputStream in2 = new ByteBufInputStream(content);){
                    byte[] data = new byte[in2.available()];
                    in2.readFully(data);
                    String message = new String(data);
                    LOG.error("Unexpected plain-text response: {}", (Object)message);
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response was not valid JSON, but plain-text: " + message, je, msg.getStatus())));
                }
                catch (IOException e) {
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response was not valid JSON, nor plain-text.", je, msg.getStatus())));
                }
                return;
            }
            catch (IOException ioe) {
                LOG.error("Response could not be read.", (Throwable)ioe);
                this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response could not be read.", ioe, msg.getStatus())));
                return;
            }
            this.jsonFuture.complete(new JsonResponse(rawResponse, msg.getStatus()));
        }
    }

    private static final class MultipartRequest
    implements Request {
        private final HttpRequest httpRequest;
        private final HttpPostRequestEncoder bodyRequestEncoder;

        MultipartRequest(HttpRequest httpRequest, HttpPostRequestEncoder bodyRequestEncoder) {
            this.httpRequest = httpRequest;
            this.bodyRequestEncoder = bodyRequestEncoder;
        }

        @Override
        public void writeTo(Channel channel) {
            ChannelFuture future = channel.writeAndFlush((Object)this.httpRequest);
            if (this.bodyRequestEncoder.isChunked()) {
                future = channel.writeAndFlush((Object)this.bodyRequestEncoder);
            }
            future.addListener(ignored -> this.bodyRequestEncoder.cleanFiles());
        }
    }

    private static final class SimpleRequest
    implements Request {
        private final HttpRequest httpRequest;

        SimpleRequest(HttpRequest httpRequest) {
            this.httpRequest = httpRequest;
        }

        @Override
        public void writeTo(Channel channel) {
            channel.writeAndFlush((Object)this.httpRequest);
        }
    }

    private static interface Request {
        public void writeTo(Channel var1) throws IOException;
    }
}

