package _ss_com.streamsets.datacollector.execution.runner.common;

import _ss_com.streamsets.datacollector.el.JobEL;
import _ss_com.streamsets.datacollector.el.PipelineEL;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner;
import _ss_com.streamsets.datacollector.store.PipelineInfo;
import _ss_com.streamsets.datacollector.util.PipelineException;
import com.streamsets.pipeline.api.impl.Utils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/common/ProductionPipelineRunnable.class */
public class ProductionPipelineRunnable implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ProductionPipelineRunnable.class);
    public static final String RUNNABLE_NAME = "ProductionPipelineRunnable";
    private final StandaloneRunner runner;
    private final ProductionPipeline pipeline;
    private final String name;
    private final String rev;
    private volatile Thread runningThread;
    private volatile boolean nodeProcessShutdown;
    private final List<Future<?>> relatedTasks;
    private volatile boolean isStopped = false;
    private final CountDownLatch countDownLatch;

    public ProductionPipelineRunnable(ThreadHealthReporter threadHealthReporter, StandaloneRunner standaloneRunner, ProductionPipeline productionPipeline, String str, String str2, List<Future<?>> list) {
        this.runner = standaloneRunner;
        this.pipeline = productionPipeline;
        this.rev = str2;
        this.name = str;
        this.relatedTasks = list;
        this.pipeline.setThreadHealthReporter(threadHealthReporter);
        this.countDownLatch = new CountDownLatch(1);
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        if (this.isStopped) {
            throw new IllegalStateException(Utils.format("Pipeline is stopped, cannot start the pipeline '{}::{}'", new Object[]{this.name, this.rev}));
        }
        String name = Thread.currentThread().getName();
        try {
            PipelineInfo info = this.pipeline.getPipelineConf().getInfo();
            if (info != null) {
                Thread.currentThread().setName(Utils.format("{}-{}-{}", new Object[]{RUNNABLE_NAME, info.getPipelineId(), info.getTitle()}));
            } else {
                Thread.currentThread().setName(Utils.format("{}-UNKNOWN_ID-{}", new Object[]{RUNNABLE_NAME, this.name}));
            }
            try {
                try {
                    this.runningThread = Thread.currentThread();
                    this.pipeline.run();
                    this.runningThread = null;
                    cancelTask();
                } catch (Throwable th) {
                    this.runningThread = null;
                    cancelTask();
                    throw th;
                }
            } catch (Error e) {
                LOG.error("A JVM error occurred while running the pipeline, {}", e.toString(), e);
                throw e;
            } catch (Exception e2) {
                if (!this.pipeline.wasStopped()) {
                    LOG.error("An exception occurred while running the pipeline, {}", e2.toString(), e2);
                }
                this.runningThread = null;
                cancelTask();
            }
            PipelineEL.unsetConstantsInContext();
            JobEL.unsetConstantsInContext();
            postStop();
            this.countDownLatch.countDown();
            Thread.currentThread().setName(name);
        } catch (Throwable th2) {
            PipelineEL.unsetConstantsInContext();
            JobEL.unsetConstantsInContext();
            postStop();
            this.countDownLatch.countDown();
            Thread.currentThread().setName(name);
            throw th2;
        }
    }

    public void stop(boolean z) throws PipelineException {
        this.isStopped = true;
        this.nodeProcessShutdown = z;
        this.pipeline.stop();
        try {
            this.countDownLatch.await();
        } catch (InterruptedException e) {
            LOG.info("Thread interrupted: {}", e.toString(), e);
        }
        LOG.info("Pipeline is in terminal state");
    }

    public void forceQuit() {
        synchronized (this.relatedTasks) {
            if (this.runningThread != null) {
                this.runningThread.interrupt();
                this.runningThread = null;
                cancelTask();
                postStop();
            }
        }
        this.countDownLatch.countDown();
    }

    private void cancelTask() {
        for (Future<?> future : this.relatedTasks) {
            LOG.debug("Cancelling task " + future);
            future.cancel(true);
        }
    }

    private void postStop() {
        Thread.currentThread().setName(Thread.currentThread().getName());
        if (!this.pipeline.wasStopped() || this.pipeline.isExecutionFailed()) {
            return;
        }
        try {
            Map<String, String> committedOffsets = this.pipeline.getCommittedOffsets();
            String format = committedOffsets.isEmpty() ? "" : (committedOffsets.size() == 1 && committedOffsets.containsKey("$com.streamsets.datacollector.pollsource.offset$")) ? Utils.format("The last committed source offset is '{}'.", new Object[]{committedOffsets.get("$com.streamsets.datacollector.pollsource.offset$")}) : Utils.format("The last committed source offset is {}.", new Object[]{committedOffsets});
            if (this.nodeProcessShutdown) {
                LOG.info("Changing state of pipeline '{}', '{}' to '{}'", new Object[]{this.name, this.rev, PipelineStatus.DISCONNECTED});
                this.pipeline.getStatusListener().stateChanged(PipelineStatus.DISCONNECTED, "The pipeline was stopped because the node process was shutdown. " + format, null);
            } else {
                LOG.info("Changing state of pipeline '{}', '{}' to '{}'", new Object[]{this.name, this.rev, PipelineStatus.STOPPED});
                this.pipeline.getStatusListener().stateChanged(PipelineStatus.STOPPED, "The pipeline was stopped. " + format, null);
            }
        } catch (PipelineException e) {
            LOG.error("An exception occurred while trying to transition pipeline state, {}", e.toString(), e);
        }
    }

    public String getRev() {
        return this.rev;
    }

    public String getName() {
        return this.name;
    }

    public boolean isStopped() {
        return this.isStopped;
    }
}
