package _ss_com.streamsets.datacollector.runner.preview;

import _ss_com.com.google.common.base.Preconditions;
import _ss_com.com.google.common.base.Throwables;
import _ss_com.com.google.common.collect.ImmutableList;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.creation.PipelineConfigBean;
import _ss_com.streamsets.datacollector.el.JobEL;
import _ss_com.streamsets.datacollector.el.PipelineEL;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.restapi.bean.MetricRegistryJson;
import _ss_com.streamsets.datacollector.runner.BatchContextImpl;
import _ss_com.streamsets.datacollector.runner.BatchImpl;
import _ss_com.streamsets.datacollector.runner.BatchListener;
import _ss_com.streamsets.datacollector.runner.ErrorSink;
import _ss_com.streamsets.datacollector.runner.EventSink;
import _ss_com.streamsets.datacollector.runner.FullPipeBatch;
import _ss_com.streamsets.datacollector.runner.MultiplexerPipe;
import _ss_com.streamsets.datacollector.runner.Observer;
import _ss_com.streamsets.datacollector.runner.ObserverPipe;
import _ss_com.streamsets.datacollector.runner.PipeContext;
import _ss_com.streamsets.datacollector.runner.PipeRunner;
import _ss_com.streamsets.datacollector.runner.PipelineRunner;
import _ss_com.streamsets.datacollector.runner.PipelineRuntimeException;
import _ss_com.streamsets.datacollector.runner.ProcessedSink;
import _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate;
import _ss_com.streamsets.datacollector.runner.RunnerPool;
import _ss_com.streamsets.datacollector.runner.RuntimeStats;
import _ss_com.streamsets.datacollector.runner.SourceOffsetTracker;
import _ss_com.streamsets.datacollector.runner.SourcePipe;
import _ss_com.streamsets.datacollector.runner.SourceResponseSink;
import _ss_com.streamsets.datacollector.runner.StageOutput;
import _ss_com.streamsets.datacollector.runner.StagePipe;
import _ss_com.streamsets.datacollector.runner.StageRuntime;
import _ss_com.streamsets.datacollector.runner.production.BadRecordsHandler;
import _ss_com.streamsets.datacollector.runner.production.ReportErrorDelegate;
import _ss_com.streamsets.datacollector.runner.production.StatsAggregationHandler;
import _ss_com.streamsets.datacollector.util.ValidationUtil;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.streamsets.pipeline.api.BatchContext;
import com.streamsets.pipeline.api.PushSource;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.StageType;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/runner/preview/PreviewPipelineRunner.class */
public class PreviewPipelineRunner implements PipelineRunner, PushSourceContextDelegate, ReportErrorDelegate {
    private static final Logger LOG = LoggerFactory.getLogger(PreviewPipelineRunner.class);
    private final RuntimeInfo runtimeInfo;
    private final SourceOffsetTracker offsetTracker;
    private final int batchSize;
    private final int batches;
    private final boolean skipTargets;
    private final boolean skipLifecycleEvents;
    private final boolean testOrigin;
    private final String name;
    private final String rev;
    private final Timer processingTimer;
    private SourcePipe originPipe;
    private List<PipeRunner> pipes;
    private RunnerPool<PipeRunner> runnerPool;
    private BadRecordsHandler badRecordsHandler;
    private StatsAggregationHandler statsAggregationHandler;
    private Map<String, StageOutput> stagesToSkip;
    private AtomicInteger batchesProcessed;
    private PipelineConfiguration pipelineConfiguration;
    private volatile Throwable exceptionFromExecution = null;
    private final MetricRegistry metrics = new MetricRegistry();
    private final List<List<StageOutput>> batchesOutput = Collections.synchronizedList(new ArrayList());

    public PreviewPipelineRunner(String str, String str2, RuntimeInfo runtimeInfo, SourceOffsetTracker sourceOffsetTracker, int i, int i2, boolean z, boolean z2, boolean z3) {
        this.name = str;
        this.rev = str2;
        this.runtimeInfo = runtimeInfo;
        this.offsetTracker = sourceOffsetTracker;
        this.batchSize = i;
        this.batches = i2;
        this.skipTargets = z;
        this.skipLifecycleEvents = z2;
        this.testOrigin = z3;
        this.processingTimer = MetricsConfigurator.createTimer(this.metrics, "pipeline.batchProcessing", str, str2);
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public MetricRegistryJson getMetricRegistryJson() {
        return null;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void errorNotification(SourcePipe sourcePipe, List<PipeRunner> list, Throwable th) {
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public RuntimeInfo getRuntimeInfo() {
        return this.runtimeInfo;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public boolean isPreview() {
        return true;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public MetricRegistry getMetrics() {
        return this.metrics;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void runLifecycleEvent(Record record, StageRuntime stageRuntime) throws StageException {
        if (this.skipLifecycleEvents) {
            return;
        }
        BatchImpl batchImpl = new BatchImpl(stageRuntime.getConfiguration().getInstanceName(), "", "", ImmutableList.of(record));
        Preconditions.checkArgument(stageRuntime.getDefinition().getType().isOneOf(new StageType[]{StageType.EXECUTOR, StageType.TARGET}), "Invalid lifecycle event stage type: " + stageRuntime.getDefinition().getType());
        stageRuntime.execute(null, 1000, batchImpl, null, new ErrorSink(), new EventSink(), new ProcessedSink(), new SourceResponseSink());
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void run(SourcePipe sourcePipe, List<PipeRunner> list, BadRecordsHandler badRecordsHandler, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        run(sourcePipe, list, badRecordsHandler, Collections.emptyList(), statsAggregationHandler);
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void run(SourcePipe sourcePipe, List<PipeRunner> list, BadRecordsHandler badRecordsHandler, List<StageOutput> list2, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        this.originPipe = sourcePipe;
        this.pipes = list;
        this.badRecordsHandler = badRecordsHandler;
        this.statsAggregationHandler = statsAggregationHandler;
        this.runnerPool = new RunnerPool<>(list, new RuntimeStats(), new Histogram(new ExponentiallyDecayingReservoir()));
        this.batchesProcessed = new AtomicInteger(0);
        this.stagesToSkip = new HashMap();
        for (StageOutput stageOutput : list2) {
            this.stagesToSkip.put(stageOutput.getInstanceName(), stageOutput);
        }
        if (sourcePipe.getStage().getStage() instanceof PushSource) {
            runPushSource();
        } else {
            runPollSource();
        }
    }

    private void runPushSource() throws StageException, PipelineRuntimeException {
        this.originPipe.getStage().setPushSourceContextDelegate(this);
        if (this.stagesToSkip.containsKey(this.originPipe.getStage().getInfo().getInstanceName())) {
            runPollSource();
        } else {
            this.originPipe.process(this.offsetTracker.getOffsets(), this.batchSize, this);
        }
        if (this.exceptionFromExecution != null) {
            Throwables.propagateIfInstanceOf(this.exceptionFromExecution, StageException.class);
            Throwables.propagateIfInstanceOf(this.exceptionFromExecution, PipelineRuntimeException.class);
            Throwables.propagate(this.exceptionFromExecution);
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public BatchContext startBatch() {
        BatchContextImpl batchContextImpl = new BatchContextImpl(new FullPipeBatch(null, null, this.batchSize, true));
        this.originPipe.prepareBatchContext(batchContextImpl);
        PipelineEL.setConstantsInContext(this.pipelineConfiguration, this.originPipe.getStage().getContext().getUserContext(), System.currentTimeMillis());
        JobEL.setConstantsInContext(null);
        return batchContextImpl;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public boolean processBatch(BatchContext batchContext, String str, String str2) {
        try {
            try {
                BatchContextImpl batchContextImpl = (BatchContextImpl) batchContext;
                this.originPipe.finishBatchContext(batchContextImpl);
                runSourceLessBatch(batchContextImpl.getStartTime(), batchContextImpl.getPipeBatch(), str, str2);
                if (this.batchesProcessed.get() >= this.batches) {
                    this.originPipe.getStage().getContext().setStop(true);
                }
                PipelineEL.unsetConstantsInContext();
                JobEL.unsetConstantsInContext();
                return true;
            } catch (Throwable th) {
                LOG.error("Error while executing preview", th);
                synchronized (this) {
                    if (this.exceptionFromExecution == null) {
                        this.exceptionFromExecution = th;
                    }
                    this.originPipe.getStage().getContext().setStop(true);
                    PipelineEL.unsetConstantsInContext();
                    JobEL.unsetConstantsInContext();
                    return false;
                }
            }
        } catch (Throwable th2) {
            PipelineEL.unsetConstantsInContext();
            JobEL.unsetConstantsInContext();
            throw th2;
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public void commitOffset(String str, String str2) {
    }

    private void runPollSource() throws StageException, PipelineRuntimeException {
        while (this.batchesProcessed.get() < this.batches) {
            FullPipeBatch fullPipeBatch = new FullPipeBatch("$com.streamsets.datacollector.pollsource.offset$", this.offsetTracker.getOffsets().get("$com.streamsets.datacollector.pollsource.offset$"), this.batchSize, true);
            long currentTimeMillis = System.currentTimeMillis();
            StageOutput stageOutput = this.stagesToSkip.get(this.originPipe.getStage().getInfo().getInstanceName());
            if (stageOutput == null) {
                this.originPipe.process(fullPipeBatch);
            } else {
                fullPipeBatch.overrideStageOutput(this.originPipe, stageOutput);
            }
            runSourceLessBatch(currentTimeMillis, fullPipeBatch, "$com.streamsets.datacollector.pollsource.offset$", fullPipeBatch.getNewOffset());
        }
    }

    private void runSourceLessBatch(long j, FullPipeBatch fullPipeBatch, String str, String str2) throws StageException, PipelineRuntimeException {
        PipeRunner pipeRunner = null;
        try {
            pipeRunner = this.runnerPool.getRunner();
            pipeRunner.executeBatch(str, str2, j, pipe -> {
                StageOutput stageOutput = this.stagesToSkip.get(pipe.getStage().getInfo().getInstanceName());
                if (stageOutput != null && !(pipe instanceof ObserverPipe) && !(pipe instanceof MultiplexerPipe)) {
                    if (pipe instanceof StagePipe) {
                        fullPipeBatch.overrideStageOutput((StagePipe) pipe, stageOutput);
                    }
                } else if (this.skipTargets && pipe.getStage().getDefinition().getType().isOneOf(new StageType[]{StageType.TARGET, StageType.EXECUTOR})) {
                    fullPipeBatch.skipStage(pipe);
                } else {
                    pipe.process(fullPipeBatch);
                }
            });
            if (pipeRunner != null) {
                this.runnerPool.returnRunner(pipeRunner);
            }
            this.offsetTracker.commitOffset(str, str2);
            this.processingTimer.update(System.currentTimeMillis() - j, TimeUnit.MILLISECONDS);
            if (ValidationUtil.isSnapshotOutputUsable(fullPipeBatch.getSnapshotsOfAllStagesOutput())) {
                this.batchesOutput.add(fullPipeBatch.getSnapshotsOfAllStagesOutput());
                this.batchesProcessed.incrementAndGet();
            }
        } catch (Throwable th) {
            if (pipeRunner != null) {
                this.runnerPool.returnRunner(pipeRunner);
            }
            throw th;
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void destroy(SourcePipe sourcePipe, List<PipeRunner> list, BadRecordsHandler badRecordsHandler, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        sourcePipe.getStage().getContext().setStop(true);
        if (this.runnerPool != null) {
            this.runnerPool.destroy();
        }
        long currentTimeMillis = System.currentTimeMillis();
        FullPipeBatch fullPipeBatch = new FullPipeBatch(null, null, this.batchSize, false);
        fullPipeBatch.skipStage(sourcePipe);
        sourcePipe.destroy(fullPipeBatch);
        for (PipeRunner pipeRunner : list) {
            FullPipeBatch fullPipeBatch2 = new FullPipeBatch(null, null, this.batchSize, true);
            fullPipeBatch2.skipStage(sourcePipe);
            pipeRunner.executeBatch(null, null, currentTimeMillis, pipe -> {
                if (pipe instanceof StagePipe) {
                    fullPipeBatch2.startStage((StagePipe) pipe);
                }
                pipe.destroy(fullPipeBatch2);
            });
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public List<List<StageOutput>> getBatchesOutput() {
        return this.batchesOutput;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void setObserver(Observer observer) {
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void registerListener(BatchListener batchListener) {
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void setRuntimeConfiguration(PipeContext pipeContext, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) {
        this.pipelineConfiguration = pipelineConfiguration;
    }

    @Override // _ss_com.streamsets.datacollector.runner.production.ReportErrorDelegate
    public void reportError(String str, ErrorMessage errorMessage) {
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineFinisherDelegate
    public void setFinished() {
        LOG.info("PreviewPipelineRunner:  setFinished() was called. ");
    }
}
