package org.apache.flink.client.program.rest;

import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExecutorUtils;

/* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClient.class */
public class RestClusterClient extends ClusterClient {
    private final RestClusterClientConfiguration restClusterClientConfiguration;
    private final RestClient restClient;
    private final ExecutorService executorService;

    public RestClusterClient(Configuration configuration) throws Exception {
        this(configuration, RestClusterClientConfiguration.fromConfiguration(configuration));
    }

    public RestClusterClient(Configuration configuration, RestClusterClientConfiguration restClusterClientConfiguration) throws Exception {
        super(configuration);
        this.executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
        this.restClusterClientConfiguration = restClusterClientConfiguration;
        this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), this.executorService);
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void shutdown() {
        try {
            super.shutdown();
        } catch (Exception e) {
            this.log.error("An error occurred during the client shutdown.", e);
        }
        this.restClient.shutdown(Time.seconds(5L));
        ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.executorService});
    }

    @Override // org.apache.flink.client.program.ClusterClient
    protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        this.log.info("Submitting job.");
        try {
            jobGraph.setAllowQueuedScheduling(true);
            submitJob(jobGraph);
            return new JobExecutionResult(jobGraph.getJobID(), 1L, Collections.emptyMap());
        } catch (JobSubmissionException e) {
            throw new ProgramInvocationException((Throwable) e);
        }
    }

    private void submitJob(JobGraph jobGraph) throws JobSubmissionException {
        this.log.info("Requesting blob server port.");
        try {
            int i = ((BlobServerPortResponseBody) this.restClient.sendRequest(this.restClusterClientConfiguration.getRestServerAddress(), this.restClusterClientConfiguration.getRestServerPort(), BlobServerPortHeaders.getInstance()).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS)).port;
            this.log.info("Uploading jar files.");
            try {
                Iterator it = BlobClient.uploadJarFiles(new InetSocketAddress(this.restClusterClientConfiguration.getBlobServerAddress(), i), this.flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()).iterator();
                while (it.hasNext()) {
                    jobGraph.addBlob((PermanentBlobKey) it.next());
                }
                this.log.info("Submitting job graph.");
                try {
                    this.restClient.sendRequest(this.restClusterClientConfiguration.getRestServerAddress(), this.restClusterClientConfiguration.getRestServerPort(), JobSubmitHeaders.getInstance(), new JobSubmitRequestBody(jobGraph)).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    throw new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", e);
                }
            } catch (Exception e2) {
                throw new JobSubmissionException(jobGraph.getJobID(), "Failed to upload user jars to blob server.", e2);
            }
        } catch (Exception e3) {
            throw new JobSubmissionException(jobGraph.getJobID(), "Failed to retrieve blob server port.", e3);
        }
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void stop(JobID jobID) throws Exception {
        JobTerminationMessageParameters jobTerminationMessageParameters = new JobTerminationMessageParameters();
        jobTerminationMessageParameters.jobPathParameter.resolve(jobID);
        jobTerminationMessageParameters.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP));
        this.restClient.sendRequest(this.restClusterClientConfiguration.getRestServerAddress(), this.restClusterClientConfiguration.getRestServerPort(), JobTerminationHeaders.getInstance(), jobTerminationMessageParameters).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void cancel(JobID jobID) throws Exception {
        JobTerminationMessageParameters jobTerminationMessageParameters = new JobTerminationMessageParameters();
        jobTerminationMessageParameters.jobPathParameter.resolve(jobID);
        jobTerminationMessageParameters.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
        this.restClient.sendRequest(this.restClusterClientConfiguration.getRestServerAddress(), this.restClusterClientConfiguration.getRestServerPort(), JobTerminationHeaders.getInstance(), jobTerminationMessageParameters).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public String cancelWithSavepoint(JobID jobID, @Nullable String str) throws Exception {
        throw new UnsupportedOperationException("Not implemented yet.");
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public CompletableFuture<String> triggerSavepoint(JobID jobID, @Nullable String str) throws Exception {
        SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
        SavepointMessageParameters unresolvedMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters();
        unresolvedMessageParameters.jobID.resolve(jobID);
        if (str != null) {
            unresolvedMessageParameters.targetDirectory.resolve(Collections.singletonList(str));
        }
        return this.restClient.sendRequest(this.restClusterClientConfiguration.getRestServerAddress(), this.restClusterClientConfiguration.getRestServerPort(), savepointTriggerHeaders, unresolvedMessageParameters).thenApply(savepointTriggerResponseBody -> {
            return savepointTriggerResponseBody.location;
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
        return this.restClient.sendRequest(this.restClusterClientConfiguration.getRestServerAddress(), this.restClusterClientConfiguration.getRestServerPort(), CurrentJobsOverviewHandlerHeaders.getInstance()).thenApply(multipleJobsDetails -> {
            ArrayList arrayList = new ArrayList();
            multipleJobsDetails.getRunning().forEach(jobDetails -> {
                arrayList.add(new JobStatusMessage(jobDetails.getJobId(), jobDetails.getJobName(), jobDetails.getStatus(), jobDetails.getStartTime()));
            });
            multipleJobsDetails.getFinished().forEach(jobDetails2 -> {
                arrayList.add(new JobStatusMessage(jobDetails2.getJobId(), jobDetails2.getJobName(), jobDetails2.getStatus(), jobDetails2.getStartTime()));
            });
            return arrayList;
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public String getClusterIdentifier() {
        return "Flip-6 Standalone cluster with dispatcher at " + this.restClusterClientConfiguration.getRestServerAddress() + '.';
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public boolean hasUserJarsInClassPath(List<URL> list) {
        return false;
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void waitForClusterToBeReady() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public String getWebInterfaceURL() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public GetClusterStatusResponse getClusterStatus() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.client.program.ClusterClient
    protected List<String> getNewMessages() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.client.program.ClusterClient
    protected void finalizeCluster() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public int getMaxSlots() {
        return 0;
    }
}
