package org.apache.flink.runtime.entrypoint;

import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;

/* loaded from: input_file:org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.class */
public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
    private ResourceManager<?> resourceManager;
    private Dispatcher dispatcher;
    private LeaderRetrievalService dispatcherLeaderRetrievalService;
    private DispatcherRestEndpoint dispatcherRestEndpoint;

    public SessionClusterEntrypoint(Configuration configuration) {
        super(configuration);
    }

    @Override // org.apache.flink.runtime.entrypoint.ClusterEntrypoint
    protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
        this.dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
        RpcGatewayRetriever rpcGatewayRetriever = new RpcGatewayRetriever(rpcService, DispatcherGateway.class, DispatcherId::new, 10, Time.milliseconds(50L));
        this.dispatcherRestEndpoint = createDispatcherRestEndpoint(configuration, rpcGatewayRetriever, rpcService.getExecutor());
        LOG.debug("Starting Dispatcher REST endpoint.");
        this.dispatcherRestEndpoint.start();
        this.resourceManager = createResourceManager(configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, this);
        this.dispatcher = createDispatcher(configuration, rpcService, highAvailabilityServices, (ResourceManagerGateway) this.resourceManager.getSelfGateway(ResourceManagerGateway.class), blobServer, heartbeatServices, metricRegistry, this, Optional.of(this.dispatcherRestEndpoint.getRestAddress()));
        LOG.debug("Starting ResourceManager.");
        this.resourceManager.start();
        LOG.debug("Starting Dispatcher.");
        this.dispatcher.start();
        this.dispatcherLeaderRetrievalService.start(rpcGatewayRetriever);
    }

    @Override // org.apache.flink.runtime.entrypoint.ClusterEntrypoint
    protected void stopClusterComponents(boolean z) throws Exception {
        Throwable th = null;
        if (this.dispatcherRestEndpoint != null) {
            this.dispatcherRestEndpoint.shutdown(Time.seconds(10L));
        }
        if (this.dispatcherLeaderRetrievalService != null) {
            try {
                this.dispatcherLeaderRetrievalService.stop();
            } catch (Throwable th2) {
                th = ExceptionUtils.firstOrSuppressed(th2, (Throwable) null);
            }
        }
        if (this.dispatcher != null) {
            try {
                this.dispatcher.shutDown();
            } catch (Throwable th3) {
                th = ExceptionUtils.firstOrSuppressed(th3, th);
            }
        }
        if (this.resourceManager != null) {
            try {
                this.resourceManager.shutDown();
            } catch (Throwable th4) {
                th = ExceptionUtils.firstOrSuppressed(th4, th);
            }
        }
        if (th != null) {
            throw new FlinkException("Could not properly shut down the session cluster entry point.", th);
        }
    }

    protected DispatcherRestEndpoint createDispatcherRestEndpoint(Configuration configuration, LeaderGatewayRetriever<DispatcherGateway> leaderGatewayRetriever, Executor executor) throws Exception {
        return new DispatcherRestEndpoint(RestServerEndpointConfiguration.fromConfiguration(configuration), leaderGatewayRetriever, configuration, RestHandlerConfiguration.fromConfiguration(configuration), executor);
    }

    protected Dispatcher createDispatcher(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional<String> optional) throws Exception {
        return new StandaloneDispatcher(rpcService, Dispatcher.DISPATCHER_NAME, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, metricRegistry, fatalErrorHandler, optional);
    }

    protected abstract ResourceManager<?> createResourceManager(Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception;
}
