/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.TreeNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestClient {
    private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
    private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
    private final Executor executor;
    private Bootstrap bootstrap;

    public RestClient(RestClientConfiguration configuration, Executor executor) {
        Preconditions.checkNotNull((Object)configuration);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        final SSLEngine sslEngine = configuration.getSslEngine();
        ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel socketChannel) throws Exception {
                if (sslEngine != null) {
                    socketChannel.pipeline().addLast("ssl", (ChannelHandler)new SslHandler(sslEngine));
                }
                socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(0x100000)}).addLast(new ChannelHandler[]{new ClientHandler()}).addLast(new ChannelHandler[]{new PipelineErrorHandler(LOG)});
            }
        };
        NioEventLoopGroup group = new NioEventLoopGroup(1, (ThreadFactory)new DefaultThreadFactory("flink-rest-client-netty"));
        this.bootstrap = new Bootstrap();
        ((Bootstrap)((Bootstrap)this.bootstrap.group((EventLoopGroup)group)).channel(NioSocketChannel.class)).handler((ChannelHandler)initializer);
        LOG.info("Rest client endpoint started.");
    }

    public void shutdown(Time timeout) {
        LOG.info("Shutting down rest endpoint.");
        CompletableFuture groupFuture = new CompletableFuture();
        if (this.bootstrap != null && this.bootstrap.group() != null) {
            this.bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(ignored -> groupFuture.complete(null));
        }
        try {
            groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            LOG.info("Rest endpoint shutdown complete.");
        }
        catch (Exception e) {
            LOG.warn("Rest endpoint shutdown failed.", (Throwable)e);
        }
    }

    public <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters) throws IOException {
        return this.sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, EmptyRequestBody.getInstance());
    }

    public <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, R request) throws IOException {
        return this.sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), request);
    }

    public <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders) throws IOException {
        return this.sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
        Preconditions.checkNotNull((Object)targetAddress);
        Preconditions.checkArgument((0 <= targetPort && targetPort < 65536 ? 1 : 0) != 0, (Object)("The target port " + targetPort + " is not in the range (0, 65536]."));
        Preconditions.checkNotNull(messageHeaders);
        Preconditions.checkNotNull(request);
        Preconditions.checkNotNull(messageParameters);
        Preconditions.checkState((boolean)messageParameters.isResolved(), (Object)"Message parameters were not resolved.");
        String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
        LOG.debug("Sending request of class {} to {}", request.getClass(), (Object)targetUrl);
        StringWriter sw = new StringWriter();
        objectMapper.writeValue((Writer)sw, request);
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
        DefaultFullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
        httpRequest.headers().add("Content-Length", (Object)payload.capacity()).add("Content-Type", (Object)RestConstants.REST_CONTENT_TYPE).set("Host", (Object)(targetAddress + ':' + targetPort)).set("Connection", (Object)"close");
        return this.submitRequest(targetAddress, targetPort, (FullHttpRequest)httpRequest, messageHeaders.getResponseClass());
    }

    private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)CompletableFuture.supplyAsync(() -> this.bootstrap.connect(targetAddress, targetPort), this.executor).thenApply(channel -> {
            try {
                return channel.sync();
            }
            catch (InterruptedException e) {
                throw new FlinkRuntimeException((Throwable)e);
            }
        })).thenApply(ChannelFuture::channel)).thenCompose(channel -> {
            ClientHandler handler = (ClientHandler)channel.pipeline().get(ClientHandler.class);
            CompletableFuture<JsonResponse> future = handler.getJsonFuture();
            channel.writeAndFlush((Object)httpRequest);
            return future;
        })).thenComposeAsync(rawResponse -> RestClient.parseResponse(rawResponse, responseClass), this.executor);
    }

    private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse rawResponse, Class<P> responseClass) {
        CompletableFuture<ResponseBody> responseFuture = new CompletableFuture<ResponseBody>();
        try {
            ResponseBody response = (ResponseBody)objectMapper.treeToValue((TreeNode)rawResponse.getJson(), responseClass);
            responseFuture.complete(response);
        }
        catch (JsonProcessingException jpe) {
            try {
                ErrorResponseBody error = (ErrorResponseBody)objectMapper.treeToValue((TreeNode)rawResponse.getJson(), ErrorResponseBody.class);
                responseFuture.completeExceptionally((Throwable)((Object)new RestClientException(error.errors.toString(), rawResponse.getHttpResponseStatus())));
            }
            catch (JsonProcessingException jpe2) {
                LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", new Object[]{responseClass, rawResponse, jpe2});
                responseFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2, rawResponse.getHttpResponseStatus())));
            }
        }
        return responseFuture;
    }

    private static final class JsonResponse {
        private final JsonNode json;
        private final HttpResponseStatus httpResponseStatus;

        private JsonResponse(JsonNode json, HttpResponseStatus httpResponseStatus) {
            this.json = (JsonNode)Preconditions.checkNotNull((Object)json);
            this.httpResponseStatus = (HttpResponseStatus)Preconditions.checkNotNull((Object)httpResponseStatus);
        }

        public JsonNode getJson() {
            return this.json;
        }

        public HttpResponseStatus getHttpResponseStatus() {
            return this.httpResponseStatus;
        }
    }

    private static class ClientHandler
    extends SimpleChannelInboundHandler<Object> {
        private final CompletableFuture<JsonResponse> jsonFuture = new CompletableFuture();

        private ClientHandler() {
        }

        CompletableFuture<JsonResponse> getJsonFuture() {
            return this.jsonFuture;
        }

        protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof FullHttpResponse) {
                this.readRawResponse((FullHttpResponse)msg);
            } else {
                LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
                if (msg instanceof HttpResponse) {
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", ((HttpResponse)msg).getStatus())));
                } else {
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", HttpResponseStatus.INTERNAL_SERVER_ERROR)));
                }
            }
            ctx.close();
        }

        private void readRawResponse(FullHttpResponse msg) {
            JsonNode rawResponse;
            ByteBuf content = msg.content();
            try {
                ByteBufInputStream in = new ByteBufInputStream(content);
                rawResponse = objectMapper.readTree((InputStream)in);
                LOG.debug("Received response {}.", (Object)rawResponse);
            }
            catch (JsonParseException je) {
                LOG.error("Response was not valid JSON.", (Throwable)je);
                content.readerIndex(0);
                try {
                    ByteBufInputStream in = new ByteBufInputStream(content);
                    byte[] data = new byte[in.available()];
                    in.readFully(data);
                    String message = new String(data);
                    LOG.error("Unexpected plain-text response: {}", (Object)message);
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response was not valid JSON, but plain-text: " + message, je, msg.getStatus())));
                }
                catch (IOException e) {
                    this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response was not valid JSON, nor plain-text.", je, msg.getStatus())));
                }
                return;
            }
            catch (IOException ioe) {
                LOG.error("Response could not be read.", (Throwable)ioe);
                this.jsonFuture.completeExceptionally((Throwable)((Object)new RestClientException("Response could not be read.", ioe, msg.getStatus())));
                return;
            }
            this.jsonFuture.complete(new JsonResponse(rawResponse, msg.getStatus()));
        }
    }
}

