package _ss_com.streamsets.datacollector.execution.manager.standalone;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.com.google.common.cache.Cache;
import _ss_com.com.google.common.cache.CacheBuilder;
import _ss_com.streamsets.datacollector.event.handler.remote.RemoteDataCollector;
import _ss_com.streamsets.datacollector.execution.EventListenerManager;
import _ss_com.streamsets.datacollector.execution.Manager;
import _ss_com.streamsets.datacollector.execution.PipelineState;
import _ss_com.streamsets.datacollector.execution.PipelineStateStore;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.PreviewStatus;
import _ss_com.streamsets.datacollector.execution.Previewer;
import _ss_com.streamsets.datacollector.execution.PreviewerListener;
import _ss_com.streamsets.datacollector.execution.Runner;
import _ss_com.streamsets.datacollector.execution.StateEventListener;
import _ss_com.streamsets.datacollector.execution.StatsCollectorRunner;
import _ss_com.streamsets.datacollector.execution.manager.PipelineManagerException;
import _ss_com.streamsets.datacollector.execution.manager.PreviewerProvider;
import _ss_com.streamsets.datacollector.execution.manager.RunnerProvider;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.metrics.MetricsCache;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.security.GroupsInScope;
import _ss_com.streamsets.datacollector.stagelibrary.StageLibraryTask;
import _ss_com.streamsets.datacollector.store.PipelineInfo;
import _ss_com.streamsets.datacollector.store.PipelineStoreException;
import _ss_com.streamsets.datacollector.store.PipelineStoreTask;
import _ss_com.streamsets.datacollector.task.AbstractTask;
import _ss_com.streamsets.datacollector.usagestats.StatsCollector;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.datacollector.validation.ValidationError;
import _ss_com.streamsets.dc.execution.manager.standalone.ResourceManager;
import _ss_com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService;
import _ss_com.streamsets.pipeline.lib.util.ExceptionUtils;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.impl.Utils;
import dagger.ObjectGraph;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/manager/standalone/StandaloneAndClusterPipelineManager.class */
public class StandaloneAndClusterPipelineManager extends AbstractTask implements Manager, PreviewerListener {
    private static final Logger LOG = LoggerFactory.getLogger(StandaloneAndClusterPipelineManager.class);
    private static final String PIPELINE_MANAGER = "PipelineManager";
    private final ObjectGraph objectGraph;

    @Inject
    RuntimeInfo runtimeInfo;

    @Inject
    Configuration configuration;

    @Inject
    PipelineStoreTask pipelineStore;

    @Inject
    PipelineStateStore pipelineStateStore;

    @Inject
    StageLibraryTask stageLibrary;

    @Inject
    @Named("previewExecutor")
    SafeScheduledExecutorService previewExecutor;

    @Inject
    @Named("runnerExecutor")
    SafeScheduledExecutorService runnerExecutor;

    @Inject
    @Named("managerExecutor")
    SafeScheduledExecutorService managerExecutor;

    @Inject
    RunnerProvider runnerProvider;

    @Inject
    PreviewerProvider previewerProvider;

    @Inject
    ResourceManager resourceManager;

    @Inject
    EventListenerManager eventListenerManager;

    @Inject
    StatsCollector statsCollector;
    private Cache<String, RunnerInfo> runnerCache;
    private Cache<String, Previewer> previewerCache;
    static final long DEFAULT_RUNNER_EXPIRY_INTERVAL = 3600000;
    static final String RUNNER_EXPIRY_INTERVAL = "runner.expiry.interval";
    static final long DEFAULT_RUNNER_EXPIRY_INITIAL_DELAY = 1800000;
    static final String RUNNER_EXPIRY_INITIAL_DELAY = "runner.expiry.initial.delay";
    static final boolean DEFAULT_RUNNER_RESTART_PIPELINES = true;
    static final String RUNNER_RESTART_PIPELINES = "runner.boot.pipeline.restart";
    private final long runnerExpiryInterval;
    private final long runnerExpiryInitialDelay;
    private ScheduledFuture<?> runnerExpiryFuture;
    private static final String NAME_AND_REV_SEPARATOR = "::";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:_ss_com/streamsets/datacollector/execution/manager/standalone/StandaloneAndClusterPipelineManager$RunnerInfo.class */
    public static class RunnerInfo {
        private final Runner runner;
        private ExecutionMode executionMode;

        private RunnerInfo(Runner runner, ExecutionMode executionMode) {
            this.runner = runner;
            this.executionMode = executionMode;
        }
    }

    public StandaloneAndClusterPipelineManager(ObjectGraph objectGraph) {
        super(PIPELINE_MANAGER);
        this.objectGraph = objectGraph;
        this.objectGraph.inject(this);
        this.runnerExpiryInterval = this.configuration.get(RUNNER_EXPIRY_INTERVAL, 3600000L);
        this.runnerExpiryInitialDelay = this.configuration.get(RUNNER_EXPIRY_INITIAL_DELAY, 1800000L);
        this.eventListenerManager.addStateEventListener(this.resourceManager);
        MetricsConfigurator.registerJmxMetrics(this.runtimeInfo.getMetrics());
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public void addStateEventListener(StateEventListener stateEventListener) {
        this.eventListenerManager.addStateEventListener(stateEventListener);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public Previewer createPreviewer(String str, String str2, String str3) throws PipelineException {
        if (!this.pipelineStore.hasPipeline(str2)) {
            throw new PipelineStoreException(ContainerError.CONTAINER_0200, str2);
        }
        Previewer createPreviewer = this.previewerProvider.createPreviewer(str, str2, str3, this, this.objectGraph);
        this.previewerCache.put(createPreviewer.getId(), createPreviewer);
        return createPreviewer;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public Previewer getPreviewer(String str) {
        Utils.checkNotNull(str, "previewerId");
        Previewer ifPresent = this.previewerCache.getIfPresent(str);
        if (ifPresent == null) {
            LOG.warn("Cannot find the previewer in cache for id: '{}'", str);
        }
        return ifPresent;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public Runner getRunner(String str, String str2) throws PipelineException {
        if (!this.pipelineStore.hasPipeline(str)) {
            throw new PipelineStoreException(ContainerError.CONTAINER_0200, str);
        }
        try {
            RunnerInfo runnerInfo = this.runnerCache.get(getNameAndRevString(str, str2), () -> {
                ExecutionMode executionMode = this.pipelineStateStore.getState(str, str2).getExecutionMode();
                return new RunnerInfo(getRunner(str, str2, executionMode), executionMode);
            });
            ExecutionMode executionMode = runnerInfo.executionMode;
            ExecutionMode executionMode2 = this.pipelineStateStore.getState(str, str2).getExecutionMode();
            if (executionMode == ExecutionMode.CLUSTER) {
                LOG.info("Upgrading execution mode from " + ExecutionMode.CLUSTER + " to " + executionMode2);
                runnerInfo.executionMode = executionMode2;
            }
            if (runnerInfo.executionMode == this.pipelineStateStore.getState(str, str2).getExecutionMode()) {
                return runnerInfo.runner;
            }
            LOG.info(Utils.format("Invalidate the existing runner for pipeline '{}::{}' as execution mode has changed", new Object[]{str, str2}));
            if (removeRunnerIfNotActive(runnerInfo.runner)) {
                return getRunner(str, str2);
            }
            throw new PipelineManagerException(ValidationError.VALIDATION_0082, this.pipelineStateStore.getState(str, str2).getExecutionMode(), runnerInfo.executionMode);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof RuntimeException) {
                throw ((RuntimeException) e.getCause());
            }
            if (e.getCause() instanceof PipelineStoreException) {
                throw ((PipelineStoreException) e.getCause());
            }
            throw new PipelineStoreException(ContainerError.CONTAINER_0114, e.toString(), e);
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public List<PipelineState> getPipelines() throws PipelineStoreException {
        List<PipelineInfo> pipelines = this.pipelineStore.getPipelines();
        ArrayList arrayList = new ArrayList();
        for (PipelineInfo pipelineInfo : pipelines) {
            String pipelineId = pipelineInfo.getPipelineId();
            String lastRev = pipelineInfo.getLastRev();
            PipelineState state = this.pipelineStateStore.getState(pipelineId, lastRev);
            Utils.checkState(state != null, Utils.format("State for pipeline: '{}::{}' doesn't exist", new Object[]{pipelineId, lastRev}));
            arrayList.add(state);
        }
        return arrayList;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public PipelineState getPipelineState(String str, String str2) throws PipelineStoreException {
        return this.pipelineStateStore.getState(str, str2);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public boolean isPipelineActive(String str, String str2) throws PipelineException {
        if (!this.pipelineStore.hasPipeline(str)) {
            throw new PipelineStoreException(ContainerError.CONTAINER_0200, str);
        }
        RunnerInfo ifPresent = this.runnerCache.getIfPresent(getNameAndRevString(str, str2));
        return ifPresent != null && ifPresent.runner.getState().getStatus().isActive();
    }

    @Override // _ss_com.streamsets.datacollector.task.AbstractTask
    public void runTask() {
        this.previewerCache = new MetricsCache(this.runtimeInfo.getMetrics(), "manager-previewer-cache", CacheBuilder.newBuilder().expireAfterAccess(30L, TimeUnit.MINUTES).removalListener(removalNotification -> {
            Previewer previewer = (Previewer) removalNotification.getValue();
            LOG.warn("Evicting idle previewer '{}::{}'::'{}' in status '{}'", new Object[]{previewer.getName(), previewer.getRev(), previewer.getId(), previewer.getStatus()});
        }).build());
        this.runnerCache = new MetricsCache(this.runtimeInfo.getMetrics(), "manager-runner-cache", CacheBuilder.newBuilder().build());
        boolean z = this.configuration.get(RUNNER_RESTART_PIPELINES, true);
        try {
            for (PipelineInfo pipelineInfo : this.pipelineStore.getPipelines()) {
                String pipelineId = pipelineInfo.getPipelineId();
                String lastRev = pipelineInfo.getLastRev();
                try {
                    if (!isRemotePipeline(pipelineId, lastRev) || this.runtimeInfo.isDPMEnabled()) {
                        PipelineState state = this.pipelineStateStore.getState(pipelineId, lastRev);
                        if (state.getStatus().isActive()) {
                            ExecutionMode executionMode = state.getExecutionMode();
                            Runner runner = getRunner(pipelineId, lastRev, executionMode);
                            runner.prepareForDataCollectorStart(state.getUser());
                            if (z && runner.getState().getStatus() == PipelineStatus.DISCONNECTED) {
                                this.runnerCache.put(getNameAndRevString(pipelineId, lastRev), new RunnerInfo(runner, executionMode));
                                try {
                                    String user = state.getUser();
                                    GroupsInScope.executeIgnoreGroups(() -> {
                                        runner.onDataCollectorStart(user);
                                        return null;
                                    });
                                } catch (Exception e) {
                                    ExceptionUtils.throwUndeclared(e.getCause());
                                }
                            }
                        }
                    } else {
                        LOG.info(Utils.format("Not activating remote pipeline'{}:{}' as DPM is disabled ", new Object[]{pipelineId, lastRev}));
                    }
                } catch (Exception e2) {
                    LOG.error(Utils.format("Error while processing pipeline '{}::{}'", new Object[]{pipelineId, lastRev}), e2);
                }
            }
            this.runnerExpiryFuture = this.managerExecutor.scheduleAtFixedRate(new Runnable() { // from class: _ss_com.streamsets.datacollector.execution.manager.standalone.StandaloneAndClusterPipelineManager.1
                @Override // java.lang.Runnable
                public void run() {
                    Iterator it = StandaloneAndClusterPipelineManager.this.runnerCache.asMap().values().iterator();
                    while (it.hasNext()) {
                        Runner runner2 = ((RunnerInfo) it.next()).runner;
                        try {
                            StandaloneAndClusterPipelineManager.LOG.debug("Runner for pipeline '{}::{}' is in status: '{}'", new Object[]{runner2.getName(), runner2.getRev(), runner2.getState()});
                            StandaloneAndClusterPipelineManager.this.removeRunnerIfNotActive(runner2);
                        } catch (PipelineStoreException e3) {
                            StandaloneAndClusterPipelineManager.LOG.warn("Cannot remove runner for pipeline: '{}::{}' due to '{}'", new Object[]{runner2.getName(), runner2.getRev(), e3.toString(), e3});
                        }
                    }
                }
            }, this.runnerExpiryInitialDelay, this.runnerExpiryInterval, TimeUnit.MILLISECONDS);
        } catch (PipelineStoreException e3) {
            throw new RuntimeException("Cannot load the list of pipelines from StateStore", e3);
        }
    }

    @VisibleForTesting
    boolean isRunnerPresent(String str, String str2) {
        return this.runnerCache.getIfPresent(getNameAndRevString(str, str2)) != null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean removeRunnerIfNotActive(Runner runner) throws PipelineStoreException {
        if (runner.getState().getStatus().isActive()) {
            return false;
        }
        this.runnerCache.invalidate(getNameAndRevString(runner.getName(), runner.getRev()));
        runner.close();
        LOG.info("Removing runner for pipeline '{}::'{}'", runner.getName(), runner.getRev());
        return true;
    }

    @Override // _ss_com.streamsets.datacollector.task.AbstractTask
    public void stopTask() {
        Iterator<RunnerInfo> it = this.runnerCache.asMap().values().iterator();
        while (it.hasNext()) {
            Runner runner = it.next().runner;
            try {
                runner.close();
                runner.onDataCollectorStop(this.pipelineStateStore.getState(runner.getName(), runner.getRev()).getUser());
            } catch (Exception e) {
                LOG.warn("Failed to stop the runner for pipeline: {} and rev: {} due to: {}", new Object[]{runner.getName(), runner.getRev(), e.toString(), e});
            }
        }
        this.runnerCache.invalidateAll();
        for (Previewer previewer : this.previewerCache.asMap().values()) {
            try {
                previewer.stop();
            } catch (Exception e2) {
                LOG.warn("Failed to stop the previewer: {}::{}::{} due to: {}", new Object[]{previewer.getName(), previewer.getRev(), previewer.getId(), e2.toString(), e2});
            }
        }
        this.previewerCache.invalidateAll();
        this.runnerExpiryFuture.cancel(true);
        LOG.info("Stopped Production Pipeline Manager");
    }

    @Override // _ss_com.streamsets.datacollector.execution.PreviewerListener
    public void statusChange(String str, PreviewStatus previewStatus) {
        LOG.debug("Status of previewer with id: '{}' changed to status: '{}'", str, previewStatus);
    }

    @Override // _ss_com.streamsets.datacollector.execution.PreviewerListener
    public void outputRetrieved(String str) {
        LOG.debug("Removing previewer with id:  '{}' from cache as output is retrieved", str);
        this.previewerCache.invalidate(str);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Manager
    public boolean isRemotePipeline(String str, String str2) throws PipelineStoreException {
        Object obj = this.pipelineStateStore.getState(str, str2).getAttributes().get(RemoteDataCollector.IS_REMOTE_PIPELINE);
        if (obj == null) {
            return false;
        }
        return ((Boolean) obj).booleanValue();
    }

    private Runner getRunner(String str, String str2, ExecutionMode executionMode) throws PipelineStoreException {
        if (executionMode == null) {
            executionMode = ExecutionMode.STANDALONE;
        }
        return new StatsCollectorRunner(this.runnerProvider.createRunner(str, str2, this.objectGraph, executionMode), this.statsCollector);
    }

    private String getNameAndRevString(String str, String str2) {
        return str + NAME_AND_REV_SEPARATOR + str2;
    }
}
