/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.entrypoint;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public abstract class ClusterEntrypoint
implements FatalErrorHandler {
    protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
    protected static final int SUCCESS_RETURN_CODE = 0;
    protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    private final Object lock = new Object();
    private final Configuration configuration;
    private final CompletableFuture<Boolean> terminationFuture;
    @GuardedBy(value="lock")
    private MetricRegistryImpl metricRegistry = null;
    @GuardedBy(value="lock")
    private HighAvailabilityServices haServices = null;
    @GuardedBy(value="lock")
    private BlobServer blobServer = null;
    @GuardedBy(value="lock")
    private HeartbeatServices heartbeatServices = null;
    @GuardedBy(value="lock")
    private RpcService commonRpcService = null;

    protected ClusterEntrypoint(Configuration configuration) {
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.terminationFuture = new CompletableFuture();
    }

    public CompletableFuture<Boolean> getTerminationFuture() {
        return this.terminationFuture;
    }

    protected void startCluster() {
        LOG.info("Starting {}.", (Object)this.getClass().getSimpleName());
        try {
            this.configureFileSystems(this.configuration);
            SecurityContext securityContext = this.installSecurityContext(this.configuration);
            securityContext.runSecured(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    ClusterEntrypoint.this.runCluster(ClusterEntrypoint.this.configuration);
                    return null;
                }
            });
        }
        catch (Throwable t) {
            LOG.error("Cluster initialization failed.", t);
            try {
                this.shutDown(false);
            }
            catch (Throwable st) {
                LOG.error("Could not properly shut down cluster entrypoint.", st);
            }
            System.exit(1);
        }
    }

    protected void configureFileSystems(Configuration configuration) throws Exception {
        LOG.info("Install default filesystem.");
        try {
            FileSystem.initialize((Configuration)configuration);
        }
        catch (IOException e) {
            throw new IOException("Error while setting the default filesystem scheme from configuration.", e);
        }
    }

    protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
        LOG.info("Install security context.");
        SecurityUtils.install(new SecurityConfiguration(configuration));
        return SecurityUtils.getInstalledContext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void runCluster(Configuration configuration) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            this.initializeServices(configuration);
            configuration.setString(JobManagerOptions.ADDRESS, this.commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, this.commonRpcService.getPort());
            this.startClusterComponents(configuration, this.commonRpcService, this.haServices, this.blobServer, this.heartbeatServices, this.metricRegistry);
        }
    }

    protected void initializeServices(Configuration configuration) throws Exception {
        assert (Thread.holdsLock(this.lock));
        LOG.info("Initializing cluster services.");
        String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
        String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
        this.commonRpcService = this.createRpcService(configuration, bindAddress, portRange);
        this.haServices = this.createHaServices(configuration, this.commonRpcService.getExecutor());
        this.blobServer = new BlobServer(configuration, this.haServices.createBlobStore());
        this.blobServer.start();
        this.heartbeatServices = this.createHeartbeatServices(configuration);
        this.metricRegistry = this.createMetricRegistry(configuration);
        ActorSystem actorSystem = ((AkkaRpcService)this.commonRpcService).getActorSystem();
        this.metricRegistry.startQueryService(actorSystem, null);
    }

    protected RpcService createRpcService(Configuration configuration, String bindAddress, String portRange) throws Exception {
        ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
        FiniteDuration duration = AkkaUtils.getTimeout(configuration);
        return new AkkaRpcService(actorSystem, Time.of((long)duration.length(), (TimeUnit)duration.unit()));
    }

    protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor) throws Exception {
        return HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
    }

    protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
        return HeartbeatServices.fromConfiguration(configuration);
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void shutDown(boolean cleanupHaData) throws FlinkException {
        LOG.info("Stopping {}.", (Object)this.getClass().getSimpleName());
        Throwable exception = null;
        Object object = this.lock;
        synchronized (object) {
            try {
                this.stopClusterComponents(cleanupHaData);
            }
            catch (Throwable t) {
                exception = ExceptionUtils.firstOrSuppressed((Throwable)t, exception);
            }
            if (this.metricRegistry != null) {
                try {
                    this.metricRegistry.shutdown();
                }
                catch (Throwable t) {
                    exception = t;
                }
            }
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            if (this.haServices != null) {
                try {
                    if (cleanupHaData) {
                        this.haServices.closeAndCleanupAllData();
                    } else {
                        this.haServices.close();
                    }
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            if (this.commonRpcService != null) {
                try {
                    this.commonRpcService.stopService();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            this.terminationFuture.complete(true);
        }
        if (exception != null) {
            throw new FlinkException("Could not properly shut down the cluster services.", exception);
        }
    }

    @Override
    public void onFatalError(Throwable exception) {
        LOG.error("Fatal error occurred in the cluster entrypoint.", exception);
        System.exit(2);
    }

    protected abstract void startClusterComponents(Configuration var1, RpcService var2, HighAvailabilityServices var3, BlobServer var4, HeartbeatServices var5, MetricRegistry var6) throws Exception;

    protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
    }

    protected static ClusterConfiguration parseArguments(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs((String[])args);
        String configDir = parameterTool.get("configDir", "");
        return new ClusterConfiguration(configDir);
    }

    protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
        return GlobalConfiguration.loadConfiguration((String)clusterConfiguration.getConfigDir());
    }
}

