package _ss_com.streamsets.datacollector.execution.runner.cluster;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.com.google.common.base.Optional;
import _ss_com.com.google.common.collect.ImmutableMap;
import _ss_com.com.google.common.collect.ImmutableSet;
import _ss_com.com.google.common.io.Files;
import _ss_com.fasterxml.jackson.core.JsonProcessingException;
import _ss_com.fasterxml.jackson.databind.ObjectMapper;
import _ss_com.streamsets.datacollector.blobstore.BlobStoreTask;
import _ss_com.streamsets.datacollector.bundles.SupportBundleManager;
import _ss_com.streamsets.datacollector.callback.CallbackInfo;
import _ss_com.streamsets.datacollector.callback.CallbackObjectType;
import _ss_com.streamsets.datacollector.cluster.ApplicationState;
import _ss_com.streamsets.datacollector.cluster.ClusterModeConstants;
import _ss_com.streamsets.datacollector.cluster.ClusterPipelineStatus;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.config.RuleDefinitions;
import _ss_com.streamsets.datacollector.config.StageConfiguration;
import _ss_com.streamsets.datacollector.creation.PipelineBeanCreator;
import _ss_com.streamsets.datacollector.creation.PipelineConfigBean;
import _ss_com.streamsets.datacollector.el.JobEL;
import _ss_com.streamsets.datacollector.el.PipelineEL;
import _ss_com.streamsets.datacollector.execution.AbstractRunner;
import _ss_com.streamsets.datacollector.execution.EventListenerManager;
import _ss_com.streamsets.datacollector.execution.PipelineState;
import _ss_com.streamsets.datacollector.execution.PipelineStateStore;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.Runner;
import _ss_com.streamsets.datacollector.execution.Snapshot;
import _ss_com.streamsets.datacollector.execution.SnapshotInfo;
import _ss_com.streamsets.datacollector.execution.alerts.AlertInfo;
import _ss_com.streamsets.datacollector.execution.cluster.ClusterHelper;
import _ss_com.streamsets.datacollector.execution.metrics.MetricsEventRunnable;
import _ss_com.streamsets.datacollector.execution.runner.RetryUtils;
import _ss_com.streamsets.datacollector.execution.runner.common.PipelineRunnerException;
import _ss_com.streamsets.datacollector.execution.runner.common.PipelineStopReason;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipeline;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipelineBuilder;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner;
import _ss_com.streamsets.datacollector.execution.runner.common.SampledRecord;
import _ss_com.streamsets.datacollector.json.ObjectMapperFactory;
import _ss_com.streamsets.datacollector.lineage.LineagePublisherTask;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.restapi.bean.IssuesJson;
import _ss_com.streamsets.datacollector.runner.Pipeline;
import _ss_com.streamsets.datacollector.runner.PipelineRuntimeException;
import _ss_com.streamsets.datacollector.runner.UserContext;
import _ss_com.streamsets.datacollector.runner.production.OffsetFileUtil;
import _ss_com.streamsets.datacollector.runner.production.SourceOffset;
import _ss_com.streamsets.datacollector.security.SecurityConfiguration;
import _ss_com.streamsets.datacollector.stagelibrary.StageLibraryTask;
import _ss_com.streamsets.datacollector.store.AclStoreTask;
import _ss_com.streamsets.datacollector.store.PipelineStoreException;
import _ss_com.streamsets.datacollector.store.PipelineStoreTask;
import _ss_com.streamsets.datacollector.updatechecker.UpdateChecker;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.datacollector.validation.Issue;
import _ss_com.streamsets.datacollector.validation.Issues;
import _ss_com.streamsets.datacollector.validation.ValidationError;
import _ss_com.streamsets.dc.execution.manager.standalone.ResourceManager;
import _ss_com.streamsets.dc.execution.manager.standalone.ThreadUsage;
import _ss_com.streamsets.lib.security.acl.dto.Acl;
import _ss_com.streamsets.lib.security.http.RemoteSSOService;
import _ss_com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService;
import _ss_org.apache.commons.io.FileUtils;
import com.codahale.metrics.MetricRegistry;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.ClusterSource;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import com.streamsets.pipeline.api.impl.PipelineUtils;
import com.streamsets.pipeline.api.impl.Utils;
import dagger.ObjectGraph;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/cluster/ClusterRunner.class */
public class ClusterRunner extends AbstractRunner {
    static final String APPLICATION_STATE = "cluster.application.state";
    private static final String APPLICATION_STATE_START_TIME = "cluster.application.startTime";
    static final String SLAVE_ERROR_ATTRIBUTE = "cluster.slave.error";
    private static final String CONSTANTS_CONFIG_NAME = "constants";
    private static final String KEY = "key";
    private static final String VALUE = "value";

    @Inject
    @Named("runnerExecutor")
    SafeScheduledExecutorService runnerExecutor;

    @Inject
    ResourceManager resourceManager;

    @Inject
    SlaveCallbackManager slaveCallbackManager;

    @Inject
    BlobStoreTask blobStoreTask;

    @Inject
    LineagePublisherTask lineagePublisherTask;

    @Inject
    SupportBundleManager supportBundleManager;
    private String pipelineTitle;
    private ObjectGraph objectGraph;
    private ClusterHelper clusterHelper;
    private final File tempDir;
    private static final long SUBMIT_TIMEOUT_SECS = 120;
    private ScheduledFuture<?> managerRunnableFuture;
    private ScheduledFuture<?> metricRunnableFuture;
    private volatile boolean isClosed;
    private ScheduledFuture<?> updateCheckerFuture;
    private UpdateChecker updateChecker;
    private MetricsEventRunnable metricsEventRunnable;
    private PipelineConfiguration pipelineConf;
    private int maxRetries;
    private boolean shouldRetry;
    private ScheduledFuture<Void> retryFuture;
    private long rateLimit;
    private static final Logger LOG = LoggerFactory.getLogger(ClusterRunner.class);
    private static final Map<PipelineStatus, Set<PipelineStatus>> VALID_TRANSITIONS = new ImmutableMap.Builder().put(PipelineStatus.EDITED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.STARTING, ImmutableSet.of(PipelineStatus.START_ERROR, PipelineStatus.STARTING, PipelineStatus.RUNNING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED, PipelineStatus.FINISHED, new PipelineStatus[0])).put(PipelineStatus.START_ERROR, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.RUNNING, ImmutableSet.of(PipelineStatus.CONNECT_ERROR, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED, PipelineStatus.FINISHED, PipelineStatus.KILLED, PipelineStatus.RUN_ERROR, PipelineStatus.RETRY)).put(PipelineStatus.RUN_ERROR, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.RETRY, ImmutableSet.of(PipelineStatus.STARTING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED, PipelineStatus.RUN_ERROR)).put(PipelineStatus.STOPPING, ImmutableSet.of(PipelineStatus.STOPPED, PipelineStatus.CONNECT_ERROR, PipelineStatus.DISCONNECTED)).put(PipelineStatus.FINISHED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.STOPPED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.KILLED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.CONNECT_ERROR, ImmutableSet.of(PipelineStatus.RUNNING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED, PipelineStatus.KILLED, PipelineStatus.FINISHED, PipelineStatus.RUN_ERROR, PipelineStatus.RETRY)).put(PipelineStatus.DISCONNECTED, ImmutableSet.of(PipelineStatus.CONNECTING)).put(PipelineStatus.CONNECTING, ImmutableSet.of(PipelineStatus.STARTING, PipelineStatus.RUNNING, PipelineStatus.CONNECT_ERROR, PipelineStatus.RETRY, PipelineStatus.FINISHED, PipelineStatus.KILLED, PipelineStatus.RUN_ERROR, PipelineStatus.DISCONNECTED)).build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/cluster/ClusterRunner$ClusterSourceInfo.class */
    public static class ClusterSourceInfo {
        private final int parallelism;
        private final Map<String, String> configsToShip;

        ClusterSourceInfo(int i, Map<String, String> map) {
            this.parallelism = i;
            this.configsToShip = map;
        }

        int getParallelism() {
            return this.parallelism;
        }

        Map<String, String> getConfigsToShip() {
            return this.configsToShip;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/cluster/ClusterRunner$ManagerRunnable.class */
    public static class ManagerRunnable implements Runnable {
        private final ClusterRunner clusterRunner;
        private final PipelineConfiguration pipelineConf;
        private final PipelineConfigBean pipelineConfigBean;
        private final String runningUser;

        public ManagerRunnable(ClusterRunner clusterRunner, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean, String str) {
            this.clusterRunner = clusterRunner;
            this.pipelineConf = pipelineConfiguration;
            this.pipelineConfigBean = pipelineConfigBean;
            this.runningUser = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                checkStatus();
            } catch (Throwable th) {
                ClusterRunner.LOG.error("Unexpected error: " + th, th);
            }
        }

        private void checkStatus() throws PipelineStoreException, PipelineRunnerException {
            if (this.clusterRunner.getState().getStatus().isActive()) {
                this.clusterRunner.connect(this.runningUser, new ApplicationState((Map) this.clusterRunner.getState().getAttributes().get(ClusterRunner.APPLICATION_STATE)), this.pipelineConf, this.pipelineConfigBean);
            }
            if (!this.clusterRunner.getState().getStatus().isActive() || this.clusterRunner.getState().getStatus() == PipelineStatus.RETRY) {
                ClusterRunner.LOG.debug(Utils.format("Cancelling the task as the runner is in a non-active state '{}'", new Object[]{this.clusterRunner.getState()}));
                this.clusterRunner.cancelRunnable();
            }
        }
    }

    @VisibleForTesting
    ClusterRunner(String str, String str2, RuntimeInfo runtimeInfo, Configuration configuration, PipelineStoreTask pipelineStoreTask, PipelineStateStore pipelineStateStore, StageLibraryTask stageLibraryTask, SafeScheduledExecutorService safeScheduledExecutorService, ClusterHelper clusterHelper, ResourceManager resourceManager, EventListenerManager eventListenerManager, String str3, AclStoreTask aclStoreTask) {
        super(str, str2, runtimeInfo, configuration, pipelineStateStore, pipelineStoreTask, stageLibraryTask, eventListenerManager, aclStoreTask);
        this.pipelineTitle = null;
        this.rateLimit = -1L;
        this.runnerExecutor = safeScheduledExecutorService;
        this.tempDir = Files.createTempDir();
        if (clusterHelper == null) {
            this.clusterHelper = new ClusterHelper(runtimeInfo, null, this.tempDir, configuration, stageLibraryTask);
        } else {
            this.clusterHelper = clusterHelper;
        }
        this.resourceManager = resourceManager;
        this.slaveCallbackManager = new SlaveCallbackManager();
        this.slaveCallbackManager.setClusterToken(str3);
    }

    public ClusterRunner(String str, String str2, ObjectGraph objectGraph) {
        super(str, str2);
        String str3;
        ExecutionMode executionMode;
        this.pipelineTitle = null;
        this.rateLimit = -1L;
        this.objectGraph = objectGraph;
        this.objectGraph.inject(this);
        this.tempDir = new File(new File(getRuntimeInfo().getDataDir(), "temp"), PipelineUtils.escapedPipelineName(Utils.format("cluster-pipeline-{}-{}", new Object[]{str, str2})));
        FileUtils.deleteQuietly(this.tempDir);
        if (!this.tempDir.mkdirs()) {
            throw new IllegalStateException(Utils.format("Could not create temp directory: {}", new Object[]{this.tempDir}));
        }
        this.clusterHelper = new ClusterHelper(getRuntimeInfo(), new SecurityConfiguration(getRuntimeInfo(), getConfiguration()), this.tempDir, getConfiguration(), getStageLibrary());
        if (getConfiguration().get("ui.refresh.interval.ms", 2000) > 0) {
            this.metricsEventRunnable = (MetricsEventRunnable) this.objectGraph.get(MetricsEventRunnable.class);
        }
        try {
            if (getState().getExecutionMode() == ExecutionMode.CLUSTER) {
                String str4 = null;
                Iterator<StageConfiguration> it = getPipelineConf(str, str2).getStages().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    StageConfiguration next = it.next();
                    if (next.getInputLanes().isEmpty()) {
                        str4 = next.getStageName();
                        break;
                    }
                }
                Utils.checkNotNull(str4, "Source name should not be null");
                if (str4.contains("ClusterHdfsDSource")) {
                    str3 = "Upgrading execution mode to " + ExecutionMode.CLUSTER_BATCH + " from " + ExecutionMode.CLUSTER;
                    executionMode = ExecutionMode.CLUSTER_BATCH;
                } else {
                    str3 = "Upgrading execution mode to " + ExecutionMode.CLUSTER_YARN_STREAMING + " from " + ExecutionMode.CLUSTER;
                    executionMode = ExecutionMode.CLUSTER_YARN_STREAMING;
                }
                PipelineState state = getState();
                getPipelineStateStore().saveState(state.getUser(), str, str2, state.getStatus(), str3, state.getAttributes(), executionMode, state.getMetrics(), state.getRetryAttempt(), state.getNextRetryTimeStamp());
            }
        } catch (PipelineException e) {
            throw new RuntimeException("Error while accessing Pipeline State: " + e, e);
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForDataCollectorStart(String str) throws PipelineStoreException, PipelineRunnerException {
        String str2;
        PipelineStatus status = getState().getStatus();
        LOG.info("Pipeline '{}::{}' has status: '{}'", new Object[]{getName(), getRev(), status});
        switch (status) {
            case STARTING:
                str2 = "Pipeline was in STARTING state, forcing it to DISCONNECTED";
                break;
            case RETRY:
                str2 = "Pipeline was in RETRY state, forcing it to DISCONNECTING";
                break;
            case CONNECTING:
                str2 = "Pipeline was in CONNECTING state, forcing it to DISCONNECTED";
                break;
            case RUNNING:
                str2 = "Pipeline was in RUNNING state, forcing it to DISCONNECTED";
                break;
            case CONNECT_ERROR:
                str2 = "Pipeline was in CONNECT_ERROR state, forcing it to DISCONNECTED";
                break;
            case STOPPING:
                str2 = "Pipeline was in STOPPING state, forcing it to DISCONNECTED";
                break;
            case DISCONNECTED:
            case EDITED:
            case FINISHED:
            case KILLED:
            case START_ERROR:
            case STOPPED:
                return;
            default:
                throw new IllegalStateException(Utils.format("Pipeline in undefined state: '{}'", new Object[]{status}));
        }
        LOG.debug(str2);
        loadStartPipelineContextFromState(str);
        validateAndSetStateTransition(str, PipelineStatus.DISCONNECTED, str2);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void onDataCollectorStart(String str) throws PipelineException, StageException {
        PipelineStatus status = getState().getStatus();
        LOG.info("Pipeline '{}::{}' has status: '{}'", new Object[]{getName(), getRev(), status});
        switch (status) {
            case DISCONNECTED:
                LOG.debug("Pipeline was in DISCONNECTED state, changing it to CONNECTING");
                loadStartPipelineContextFromState(str);
                validateAndSetStateTransition(str, PipelineStatus.CONNECTING, "Pipeline was in DISCONNECTED state, changing it to CONNECTING");
                connectOrStart(getStartPipelineContext());
                return;
            default:
                LOG.error(Utils.format("Pipeline has unexpected status: '{}' on data collector start", new Object[]{status}));
                return;
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getPipelineTitle() throws PipelineException {
        if (this.pipelineTitle == null) {
            this.pipelineTitle = getPipelineStore().getInfo(getName()).getTitle();
        }
        return this.pipelineTitle;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void resetOffset(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public SourceOffset getCommittedOffsets() {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void updateCommittedOffsets(SourceOffset sourceOffset) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void onDataCollectorStop(String str) throws PipelineException {
        stopPipeline(str, true);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public synchronized void stop(String str) throws PipelineException {
        stopPipeline(str, false);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public synchronized void forceQuit(String str) throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException {
        throw new UnsupportedOperationException("ForceQuit is not supported in Cluster mode");
    }

    private synchronized void stopPipeline(String str, boolean z) throws PipelineException {
        try {
            if (z) {
                if (getState().getStatus() == PipelineStatus.RETRY) {
                    this.retryFuture.cancel(true);
                }
                validateAndSetStateTransition(str, PipelineStatus.DISCONNECTED, "Node is shutting down, disconnecting from the pipeline in " + getState().getExecutionMode() + " mode");
            } else {
                stop(str, new ApplicationState((Map) getState().getAttributes().get(APPLICATION_STATE)), this.pipelineConf, PipelineBeanCreator.get().create(getPipelineConfiguration(), new ArrayList(), getStartPipelineContext().getRuntimeParameters()));
            }
        } finally {
            cancelRunnable();
        }
    }

    private Map<String, Object> getAttributes() throws PipelineStoreException {
        return getState().getAttributes();
    }

    private void connectOrStart(Runner.StartPipelineContext startPipelineContext) throws PipelineException, StageException {
        HashMap hashMap = new HashMap();
        hashMap.putAll(getAttributes());
        ApplicationState applicationState = new ApplicationState((Map) hashMap.get(APPLICATION_STATE));
        if (applicationState.getAppId() == null && applicationState.getEmrConfig() == null) {
            retryOrStart(startPipelineContext);
            return;
        }
        try {
            this.slaveCallbackManager.setClusterToken(applicationState.getSdcToken());
            this.pipelineConf = getPipelineConf(getName(), getRev());
            PipelineConfigBean create = PipelineBeanCreator.get().create(this.pipelineConf, new ArrayList(), getStartPipelineContext().getRuntimeParameters());
            connect(startPipelineContext.getUser(), applicationState, this.pipelineConf, create);
            if (getState().getStatus().isActive()) {
                scheduleRunnable(startPipelineContext.getUser(), this.pipelineConf, create);
            }
        } catch (PipelineRunnerException | PipelineStoreException e) {
            validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.CONNECT_ERROR, e.toString(), hashMap);
            throw e;
        }
    }

    private void retryOrStart(Runner.StartPipelineContext startPipelineContext) throws PipelineException, StageException {
        if (getState().getRetryAttempt() != 0) {
            validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.RETRY, "Changing the state to RETRY on startup");
        } else {
            prepareForStart(startPipelineContext);
            start(startPipelineContext);
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForStart(Runner.StartPipelineContext startPipelineContext) throws PipelineStoreException, PipelineRunnerException {
        PipelineState state = getState();
        checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(PipelineStatus.STARTING), ContainerError.CONTAINER_0102, state.getStatus(), PipelineStatus.STARTING);
        if (!this.resourceManager.requestRunnerResources(ThreadUsage.CLUSTER)) {
            throw new PipelineRunnerException(ContainerError.CONTAINER_0166, getName());
        }
        LOG.info("Preparing to start pipeline '{}::{}'", getName(), getRev());
        setStartPipelineContext(startPipelineContext);
        validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.STARTING, "Starting pipeline in " + getState().getExecutionMode() + " mode");
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForStop(String str) throws PipelineStoreException, PipelineRunnerException {
        LOG.info("Preparing to stop pipeline '{}::{}'", getName(), getRev());
        if (getState().getStatus() != PipelineStatus.RETRY) {
            validateAndSetStateTransition(str, PipelineStatus.STOPPING, "Stopping pipeline in " + getState().getExecutionMode() + " mode");
            return;
        }
        this.retryFuture.cancel(true);
        validateAndSetStateTransition(str, PipelineStatus.STOPPING, null);
        validateAndSetStateTransition(str, PipelineStatus.STOPPED, "Stopped while the pipeline was in RETRY state");
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public synchronized void start(Runner.StartPipelineContext startPipelineContext) throws PipelineException, StageException {
        try {
            Utils.checkState(!this.isClosed, Utils.formatL("Cannot start the pipeline '{}::{}' as the runner is already closed", new Object[]{getName(), getRev()}));
            ExecutionMode executionMode = getState().getExecutionMode();
            if (executionMode != ExecutionMode.CLUSTER_BATCH && executionMode != ExecutionMode.CLUSTER_YARN_STREAMING && executionMode != ExecutionMode.CLUSTER_MESOS_STREAMING && executionMode != ExecutionMode.EMR_BATCH) {
                throw new PipelineRunnerException(ValidationError.VALIDATION_0073, new Object[0]);
            }
            setStartPipelineContext(startPipelineContext);
            LOG.debug("State of pipeline for '{}::{}' is '{}' ", new Object[]{getName(), getRev(), getState()});
            this.pipelineConf = getPipelineConf(getName(), getRev());
            if (startPipelineContext.getRuntimeParameters() != null) {
                this.pipelineConf.getConfiguration().forEach(config -> {
                    if (config.getName().equals(CONSTANTS_CONFIG_NAME)) {
                        for (Map map : (List) config.getValue()) {
                            String str = (String) map.get(KEY);
                            if (startPipelineContext.getRuntimeParameters().containsKey(str)) {
                                map.put("value", startPipelineContext.getRuntimeParameters().get(str));
                            }
                        }
                    }
                });
            }
            this.pipelineConf.setTestOriginStage(null);
            PipelineEL.setConstantsInContext(this.pipelineConf, new UserContext(startPipelineContext.getUser(), getRuntimeInfo().isDPMEnabled(), getConfiguration().get(RemoteSSOService.DPM_USER_ALIAS_NAME_ENABLED, false)), getState().getTimeStamp());
            PipelineConfigBean create = PipelineBeanCreator.get().create(this.pipelineConf, new ArrayList(), getStartPipelineContext().getRuntimeParameters());
            if (create != null) {
                JobEL.setConstantsInContext(create.constants);
            }
            doStart(startPipelineContext.getUser(), this.pipelineConf, getClusterSourceInfo(startPipelineContext, getName(), getRev(), this.pipelineConf), getAcl(getName()), startPipelineContext.getRuntimeParameters());
        } catch (Exception e) {
            validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.START_ERROR, e.toString(), getAttributes());
            throw e;
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void startAndCaptureSnapshot(Runner.StartPipelineContext startPipelineContext, String str, String str2, int i, int i2) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String captureSnapshot(String str, String str2, String str3, int i, int i2) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String updateSnapshotLabel(String str, String str2) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Snapshot getSnapshot(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<SnapshotInfo> getSnapshotsInfo() {
        return Collections.emptyList();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void deleteSnapshot(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Object getMetrics() {
        if (this.metricsEventRunnable != null) {
            return this.metricsEventRunnable.getAggregatedMetrics();
        }
        return null;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<Record> getErrorRecords(String str, int i) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<ErrorMessage> getErrorMessages(String str, int i) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<SampledRecord> getSampledRecords(String str, int i) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Collection<CallbackInfo> getSlaveCallbackList(CallbackObjectType callbackObjectType) {
        return this.slaveCallbackManager.getSlaveCallbackList(callbackObjectType);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public boolean deleteAlert(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<AlertInfo> getAlerts() {
        return Collections.emptyList();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void close() {
        this.isClosed = true;
    }

    private void validateAndSetStateTransition(String str, PipelineStatus pipelineStatus, String str2) throws PipelineStoreException, PipelineRunnerException {
        Map<String, Object> createStateAttributes = createStateAttributes();
        createStateAttributes.putAll(getAttributes());
        validateAndSetStateTransition(str, pipelineStatus, str2, createStateAttributes);
    }

    @VisibleForTesting
    void validateAndSetStateTransition(String str, PipelineStatus pipelineStatus, String str2, Map<String, Object> map) throws PipelineStoreException, PipelineRunnerException {
        PipelineState state;
        PipelineState saveState;
        Utils.checkState(map != null, "Attributes cannot be set to null");
        PipelineState state2 = getState();
        if (state2.getStatus() == pipelineStatus && pipelineStatus != PipelineStatus.STARTING) {
            LOG.debug(Utils.format("Ignoring status '{}' as this is same as current status", new Object[]{state2.getStatus()}));
            return;
        }
        synchronized (this) {
            state = getState();
            checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(pipelineStatus), ContainerError.CONTAINER_0102, state.getStatus(), pipelineStatus);
            long nextRetryTimeStamp = state.getNextRetryTimeStamp();
            int retryAttempt = state.getRetryAttempt();
            if (pipelineStatus == PipelineStatus.RUN_ERROR) {
                handleErrorCallbackFromSlaves(map);
            }
            if (pipelineStatus == PipelineStatus.RUN_ERROR && this.shouldRetry) {
                pipelineStatus = PipelineStatus.RETRY;
                checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(pipelineStatus), ContainerError.CONTAINER_0102, state.getStatus(), pipelineStatus);
            }
            if (pipelineStatus == PipelineStatus.RETRY && state.getStatus() != PipelineStatus.CONNECTING) {
                retryAttempt = state.getRetryAttempt() + 1;
                if (retryAttempt <= this.maxRetries || this.maxRetries == -1) {
                    nextRetryTimeStamp = RetryUtils.getNextRetryTimeStamp(retryAttempt, System.currentTimeMillis());
                } else {
                    LOG.info("Retry attempt '{}' is greater than max no of retries '{}'", Integer.valueOf(retryAttempt), Integer.valueOf(this.maxRetries));
                    pipelineStatus = PipelineStatus.RUN_ERROR;
                    retryAttempt = 0;
                    nextRetryTimeStamp = 0;
                }
            } else if (!pipelineStatus.isActive()) {
                retryAttempt = 0;
                nextRetryTimeStamp = 0;
            }
            ObjectMapper objectMapper = ObjectMapperFactory.get();
            String str3 = null;
            if (!pipelineStatus.isActive() || pipelineStatus == PipelineStatus.DISCONNECTED) {
                Object metrics = getMetrics();
                if (metrics != null) {
                    try {
                        str3 = objectMapper.writer().writeValueAsString(metrics);
                    } catch (JsonProcessingException e) {
                        throw new PipelineStoreException(ContainerError.CONTAINER_0210, e.toString(), e);
                    }
                }
                if (str3 == null) {
                    str3 = getState().getMetrics();
                }
            }
            saveState = getPipelineStateStore().saveState(str, getName(), getRev(), pipelineStatus, str2, map, getState().getExecutionMode(), str3, retryAttempt, nextRetryTimeStamp);
            if (pipelineStatus == PipelineStatus.RETRY) {
                this.retryFuture = scheduleForRetries(this.runnerExecutor);
            }
        }
        if (getEventListenerManager() != null) {
            getEventListenerManager().broadcastStateChange(state, saveState, ThreadUsage.CLUSTER, OffsetFileUtil.getOffsets(getRuntimeInfo(), getName(), getRev()));
        }
    }

    @VisibleForTesting
    void handleErrorCallbackFromSlaves(Map<String, Object> map) {
        HashSet hashSet = new HashSet();
        Iterator<CallbackInfo> it = this.slaveCallbackManager.getSlaveCallbackList(CallbackObjectType.ERROR).iterator();
        while (it.hasNext()) {
            String callbackObject = it.next().getCallbackObject();
            if (hashSet.add(callbackObject)) {
                LOG.error("Error in Slave Runner:" + callbackObject);
            }
        }
        if (map != null) {
            map.put(SLAVE_ERROR_ATTRIBUTE, hashSet);
        }
        this.slaveCallbackManager.clearSlaveList(CallbackObjectType.ERROR);
    }

    private void checkState(boolean z, ContainerError containerError, Object... objArr) throws PipelineRunnerException {
        if (!z) {
            throw new PipelineRunnerException(containerError, objArr);
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void updateSlaveCallbackInfo(CallbackInfo callbackInfo) {
        this.slaveCallbackManager.updateSlaveCallbackInfo(callbackInfo);
    }

    @VisibleForTesting
    ClusterSourceInfo getClusterSourceInfo(Runner.StartPipelineContext startPipelineContext, String str, String str2, PipelineConfiguration pipelineConfiguration) throws PipelineRuntimeException, StageException, PipelineStoreException, PipelineRunnerException {
        ProductionPipeline createProductionPipeline = createProductionPipeline(startPipelineContext, str, str2, pipelineConfiguration);
        Pipeline pipeline = createProductionPipeline.getPipeline();
        try {
            List<Issue> init = pipeline.init(false);
            if (!init.isEmpty()) {
                PipelineRuntimeException pipelineRuntimeException = new PipelineRuntimeException(ContainerError.CONTAINER_0800, str, init.get(0).getMessage());
                HashMap hashMap = new HashMap();
                hashMap.putAll(getAttributes());
                hashMap.put("issues", new IssuesJson(new Issues(init)));
                validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.START_ERROR, init.get(0).getMessage(), hashMap);
                throw pipelineRuntimeException;
            }
            ClusterSource source = createProductionPipeline.getPipeline().getSource();
            if (!(source instanceof ClusterSource)) {
                throw new RuntimeException(Utils.format("Stage '{}' does not implement '{}'", new Object[]{source.getClass().getName(), ClusterSource.class.getName()}));
            }
            ClusterSource clusterSource = source;
            try {
                int parallelism = clusterSource.getParallelism();
                if (parallelism < 1) {
                    throw new PipelineRuntimeException(ContainerError.CONTAINER_0112, new Object[0]);
                }
                return new ClusterSourceInfo(parallelism, clusterSource.getConfigsToShip());
            } catch (IOException | StageException e) {
                throw new PipelineRuntimeException(ContainerError.CONTAINER_0117, e.toString(), e);
            }
        } finally {
            pipeline.destroy(false, PipelineStopReason.UNUSED);
        }
    }

    private ProductionPipeline createProductionPipeline(Runner.StartPipelineContext startPipelineContext, String str, String str2, PipelineConfiguration pipelineConfiguration) throws PipelineStoreException, PipelineRuntimeException, StageException {
        ProductionPipelineRunner productionPipelineRunner = new ProductionPipelineRunner(str, str2, this.supportBundleManager, getConfiguration(), getRuntimeInfo(), new MetricRegistry(), null, null);
        if (this.rateLimit > 0) {
            productionPipelineRunner.setRateLimit(Long.valueOf(this.rateLimit));
        }
        return new ProductionPipelineBuilder(str, str2, getConfiguration(), getRuntimeInfo(), getStageLibrary(), productionPipelineRunner, null, this.blobStoreTask, this.lineagePublisherTask).build(new UserContext(startPipelineContext.getUser(), getRuntimeInfo().isDPMEnabled(), getConfiguration().get(RemoteSSOService.DPM_USER_ALIAS_NAME_ENABLED, false)), pipelineConfiguration, getState().getTimeStamp(), startPipelineContext.getInterceptorConfigurations(), null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect(String str, ApplicationState applicationState, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) throws PipelineStoreException, PipelineRunnerException {
        ClusterPipelineStatus clusterPipelineStatus = null;
        boolean z = false;
        try {
            clusterPipelineStatus = this.clusterHelper.getStatus(applicationState, pipelineConfiguration, pipelineConfigBean);
            z = true;
        } catch (IOException e) {
            String str2 = "IO Error while trying to check the status of pipeline: " + e;
            LOG.error(str2, e);
            validateAndSetStateTransition(str, PipelineStatus.CONNECT_ERROR, str2);
        } catch (TimeoutException e2) {
            String str3 = "Timedout while trying to check the status of pipeline: " + e2;
            LOG.error(str3, e2);
            validateAndSetStateTransition(str, PipelineStatus.CONNECT_ERROR, str3);
        } catch (Exception e3) {
            String str4 = "Error getting status of pipeline: " + e3;
            LOG.error(str4, e3);
            validateAndSetStateTransition(str, PipelineStatus.CONNECT_ERROR, str4);
        }
        if (z) {
            PipelineStatus status = getState().getStatus();
            if (clusterPipelineStatus == ClusterPipelineStatus.RUNNING) {
                validateAndSetStateTransition(str, PipelineStatus.RUNNING, "Connected to pipeline in cluster mode");
                return;
            }
            if (clusterPipelineStatus == ClusterPipelineStatus.FAILED) {
                LOG.debug("Pipeline failed in cluster");
                postTerminate(str, applicationState, pipelineConfiguration, pipelineConfigBean, status == PipelineStatus.STARTING ? PipelineStatus.START_ERROR : PipelineStatus.RUN_ERROR, "Pipeline failed in cluster");
            } else if (clusterPipelineStatus == ClusterPipelineStatus.KILLED) {
                LOG.debug("Pipeline killed in cluster");
                postTerminate(str, applicationState, pipelineConfiguration, pipelineConfigBean, status == PipelineStatus.STARTING ? PipelineStatus.START_ERROR : PipelineStatus.KILLED, "Pipeline killed in cluster");
            } else if (clusterPipelineStatus == ClusterPipelineStatus.SUCCEEDED) {
                LOG.debug("Pipeline succeeded in cluster");
                postTerminate(str, applicationState, pipelineConfiguration, pipelineConfigBean, PipelineStatus.FINISHED, "Pipeline succeeded in cluster");
            }
        }
    }

    private void postTerminate(String str, ApplicationState applicationState, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean, PipelineStatus pipelineStatus, String str2) throws PipelineStoreException, PipelineRunnerException {
        LOG.info("Cleaning up application");
        try {
            this.clusterHelper.cleanUp(applicationState, pipelineConfiguration, pipelineConfigBean);
        } catch (IOException | StageException e) {
            LOG.error("Error cleaning up application: {}", e, e);
        }
        Optional<String> dirId = applicationState.getDirId();
        if (dirId.isPresent()) {
            deleteDir(dirId.get());
        }
        HashMap hashMap = new HashMap();
        hashMap.putAll(getAttributes());
        hashMap.remove(APPLICATION_STATE);
        hashMap.remove(APPLICATION_STATE_START_TIME);
        validateAndSetStateTransition(str, pipelineStatus, str2, hashMap);
    }

    private void deleteDir(String str) {
        FileUtils.deleteQuietly(new File(getRuntimeInfo().getDataDir(), str));
    }

    private synchronized void doStart(String str, PipelineConfiguration pipelineConfiguration, ClusterSourceInfo clusterSourceInfo, Acl acl, Map<String, Object> map) throws PipelineStoreException, PipelineRunnerException {
        try {
            Utils.checkNotNull(pipelineConfiguration, "PipelineConfiguration cannot be null");
            Utils.checkState(clusterSourceInfo.getParallelism() != 0, "Parallelism cannot be zero");
            if (this.metricsEventRunnable != null) {
                this.metricsEventRunnable.clearSlaveMetrics();
            }
            ArrayList arrayList = new ArrayList();
            PipelineConfigBean create = PipelineBeanCreator.get().create(pipelineConfiguration, arrayList, map);
            if (create == null) {
                throw new PipelineRunnerException(ContainerError.CONTAINER_0116, arrayList);
            }
            this.maxRetries = create.retryAttempts;
            this.shouldRetry = create.shouldRetry;
            this.rateLimit = create.rateLimit;
            registerEmailNotifierIfRequired(create, getName(), pipelineConfiguration.getTitle(), getRev());
            registerWebhookNotifierIfRequired(create, getName(), pipelineConfiguration.getTitle(), getRev());
            HashMap hashMap = new HashMap();
            File file = new File(getRuntimeInfo().getLibexecDir(), "bootstrap-libs");
            hashMap.put(ClusterModeConstants.NUM_EXECUTORS_KEY, String.valueOf(clusterSourceInfo.getParallelism()));
            hashMap.put(ClusterModeConstants.CLUSTER_PIPELINE_NAME, getName());
            hashMap.put(ClusterModeConstants.CLUSTER_PIPELINE_TITLE, pipelineConfiguration.getTitle());
            hashMap.put(ClusterModeConstants.CLUSTER_PIPELINE_REV, getRev());
            hashMap.put(ClusterModeConstants.CLUSTER_PIPELINE_USER, str);
            hashMap.put(ClusterModeConstants.CLUSTER_PIPELINE_REMOTE, String.valueOf(isRemotePipeline()));
            for (Map.Entry<String, String> entry : clusterSourceInfo.getConfigsToShip().entrySet()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Config to ship " + entry.getKey() + " = " + entry.getValue());
                }
                hashMap.put(entry.getKey(), entry.getValue());
            }
            getRuntimeInfo().setAttribute(ClusterModeConstants.NUM_EXECUTORS_KEY, Integer.valueOf(clusterSourceInfo.getParallelism()));
            this.slaveCallbackManager.clearSlaveList();
            ApplicationState submit = this.clusterHelper.submit(pipelineConfiguration, create, getStageLibrary(), getCredentialStores(), new File(getRuntimeInfo().getConfigDir()), new File(getRuntimeInfo().getResourcesDir()), new File(getRuntimeInfo().getStaticWebDir()), file, hashMap, SUBMIT_TIMEOUT_SECS, getRules(), acl);
            Map<String, Object> createStateAttributes = createStateAttributes();
            createStateAttributes.putAll(getAttributes());
            createStateAttributes.put(APPLICATION_STATE, submit.getMap());
            createStateAttributes.put(APPLICATION_STATE_START_TIME, Long.valueOf(System.currentTimeMillis()));
            this.slaveCallbackManager.setClusterToken(submit.getSdcToken());
            validateAndSetStateTransition(str, submit.getAppId() == null ? PipelineStatus.STARTING : PipelineStatus.RUNNING, Utils.format("Pipeline in cluster is running ({})", new Object[]{submit.getAppId()}), createStateAttributes);
            scheduleRunnable(str, pipelineConfiguration, create);
        } catch (IOException e) {
            String str2 = "IO Error while trying to start the pipeline: " + e;
            LOG.error(str2, e);
            validateAndSetStateTransition(str, PipelineStatus.START_ERROR, str2);
        } catch (TimeoutException e2) {
            String str3 = "Timedout while trying to start the pipeline: " + e2;
            LOG.error(str3, e2);
            validateAndSetStateTransition(str, PipelineStatus.START_ERROR, str3);
        } catch (Exception e3) {
            String str4 = "Unexpected error starting pipeline: " + e3;
            LOG.error(str4, e3);
            validateAndSetStateTransition(str, PipelineStatus.START_ERROR, str4);
        }
    }

    private void scheduleRunnable(String str, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) {
        this.updateChecker = new UpdateChecker(getRuntimeInfo(), getConfiguration(), pipelineConfiguration, this);
        this.updateCheckerFuture = this.runnerExecutor.scheduleAtFixedRate(this.updateChecker, 1L, 1440L, TimeUnit.MINUTES);
        if (this.metricsEventRunnable != null) {
            this.metricRunnableFuture = this.runnerExecutor.scheduleAtFixedRate(this.metricsEventRunnable, 0L, this.metricsEventRunnable.getScheduledDelay(), TimeUnit.MILLISECONDS);
        }
        this.managerRunnableFuture = this.runnerExecutor.scheduleAtFixedRate(new ManagerRunnable(this, pipelineConfiguration, pipelineConfigBean, str), 0L, 30L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelRunnable() {
        if (this.metricRunnableFuture != null) {
            this.metricRunnableFuture.cancel(true);
            this.metricsEventRunnable.clearSlaveMetrics();
        }
        if (this.managerRunnableFuture != null) {
            this.managerRunnableFuture.cancel(false);
        }
        if (this.updateCheckerFuture != null) {
            this.updateCheckerFuture.cancel(true);
        }
    }

    private synchronized void stop(String str, ApplicationState applicationState, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) throws PipelineStoreException, PipelineRunnerException {
        Utils.checkState(applicationState != null, "Application state cannot be null");
        boolean z = false;
        try {
            this.clusterHelper.kill(applicationState, pipelineConfiguration, pipelineConfigBean);
            z = true;
        } catch (IOException e) {
            String str2 = "IO Error while trying to stop the pipeline: " + e;
            LOG.error(str2, e);
            validateAndSetStateTransition(str, PipelineStatus.CONNECT_ERROR, str2);
        } catch (TimeoutException e2) {
            String str3 = "Timedout while trying to stop the pipeline: " + e2;
            LOG.error(str3, e2);
            validateAndSetStateTransition(str, PipelineStatus.CONNECT_ERROR, str3);
        } catch (Exception e3) {
            String str4 = "Unexpected error stopping pipeline: " + e3;
            LOG.error(str4, e3);
            validateAndSetStateTransition(str, PipelineStatus.CONNECT_ERROR, str4);
        }
        if (z) {
            postTerminate(str, new ApplicationState((Map) getState().getAttributes().get(APPLICATION_STATE)), pipelineConfiguration, pipelineConfigBean, PipelineStatus.STOPPED, "Stopped cluster pipeline");
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Map getUpdateInfo() {
        return this.updateChecker.getUpdateInfo();
    }

    RuleDefinitions getRules() throws PipelineException {
        return getPipelineStore().retrieveRules(getName(), getRev());
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getToken() {
        return this.slaveCallbackManager.getClusterToken();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public int getRunnerCount() {
        return 1;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Runner getDelegatingRunner() {
        return null;
    }
}
