package _ss_com.streamsets.datacollector.runner;

import _ss_com.com.google.common.collect.ImmutableList;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.pipeline.lib.log.LogConstants;
import com.codahale.metrics.MetricRegistry;
import com.streamsets.pipeline.api.OffsetCommitTrigger;
import com.streamsets.pipeline.api.OnRecordError;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.Target;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.MDC;

/* loaded from: input_file:_ss_com/streamsets/datacollector/runner/PipeRunner.class */
public class PipeRunner {
    public static final String METRIC_BATCH_COUNT = "batchCount";
    public static final String METRIC_OFFSET_KEY = "offsetKey";
    public static final String METRIC_OFFSET_VALUE = "offsetValue";
    public static final String METRIC_CURRENT_STAGE = "currentStage";
    public static final String METRIC_BATCH_START_TIME = "batchStartTime";
    public static final String METRIC_STAGE_START_TIME = "stageStartTime";
    public static final String IDLE = "IDLE";
    private final int runnerId;
    private final List<Pipe> pipes;
    private final Map<String, Object> runtimeMetricGauge;

    @FunctionalInterface
    /* loaded from: input_file:_ss_com/streamsets/datacollector/runner/PipeRunner$ThrowingConsumer.class */
    public interface ThrowingConsumer<T> {
        void accept(T t) throws PipelineRuntimeException, StageException;
    }

    public PipeRunner(String str, String str2, int i, MetricRegistry metricRegistry, List<Pipe> list) {
        this.runnerId = i;
        this.pipes = ImmutableList.copyOf((Collection) list);
        this.runtimeMetricGauge = (Map) MetricsConfigurator.createStageGauge(metricRegistry, "runner." + i, null, str, str2).getValue();
        this.runtimeMetricGauge.put("batchCount", 0L);
        resetBatchSpecificMetrics();
    }

    public Pipe get(int i) {
        return this.pipes.get(i);
    }

    public int getRunnerId() {
        return this.runnerId;
    }

    public int size() {
        return this.pipes.size();
    }

    public List<Pipe> getPipes() {
        return this.pipes;
    }

    public void executeBatch(String str, String str2, long j, ThrowingConsumer<Pipe> throwingConsumer) throws PipelineRuntimeException, StageException {
        MDC.put(LogConstants.RUNNER, String.valueOf(this.runnerId));
        this.runtimeMetricGauge.put(METRIC_BATCH_START_TIME, Long.valueOf(j));
        this.runtimeMetricGauge.put(METRIC_OFFSET_KEY, Optional.ofNullable(str).orElse(""));
        this.runtimeMetricGauge.put(METRIC_OFFSET_KEY, Optional.ofNullable(str2).orElse(""));
        this.runtimeMetricGauge.put(METRIC_STAGE_START_TIME, Long.valueOf(System.currentTimeMillis()));
        try {
            for (Pipe pipe : this.pipes) {
                this.runtimeMetricGauge.put(METRIC_CURRENT_STAGE, pipe.getStage().getInfo().getInstanceName());
                if (pipe instanceof StagePipe) {
                    this.runtimeMetricGauge.put(METRIC_STAGE_START_TIME, Long.valueOf(System.currentTimeMillis()));
                }
                throwingConsumer.accept(pipe);
            }
            this.runtimeMetricGauge.computeIfPresent("batchCount", (str3, obj) -> {
                return Long.valueOf(((Long) obj).longValue() + 1);
            });
            resetBatchSpecificMetrics();
            MDC.put(LogConstants.RUNNER, "");
        } catch (Throwable th) {
            resetBatchSpecificMetrics();
            MDC.put(LogConstants.RUNNER, "");
            throw th;
        }
    }

    private void resetBatchSpecificMetrics() {
        this.runtimeMetricGauge.put(METRIC_CURRENT_STAGE, IDLE);
        this.runtimeMetricGauge.put(METRIC_OFFSET_KEY, "");
        this.runtimeMetricGauge.put(METRIC_OFFSET_VALUE, "");
        this.runtimeMetricGauge.put(METRIC_BATCH_START_TIME, 0L);
    }

    /* JADX WARN: Finally extract failed */
    public void forEach(ThrowingConsumer<Pipe> throwingConsumer) {
        try {
            MDC.put(LogConstants.RUNNER, String.valueOf(this.runnerId));
            try {
                Iterator<Pipe> it = this.pipes.iterator();
                while (it.hasNext()) {
                    throwingConsumer.accept(it.next());
                }
                MDC.put(LogConstants.RUNNER, "");
            } catch (Throwable th) {
                MDC.put(LogConstants.RUNNER, "");
                throw th;
            }
        } catch (PipelineException | StageException e) {
            throw new RuntimeException(e);
        }
    }

    public OffsetCommitTrigger getOffsetCommitTrigger() {
        Iterator<Pipe> it = this.pipes.iterator();
        while (it.hasNext()) {
            OffsetCommitTrigger stage = it.next().getStage().getStage();
            if ((stage instanceof Target) && (stage instanceof OffsetCommitTrigger)) {
                return stage;
            }
        }
        return null;
    }

    public boolean onRecordErrorStopPipeline() {
        Iterator<Pipe> it = this.pipes.iterator();
        while (it.hasNext()) {
            if (it.next().getStage().getContext().getOnErrorRecord() == OnRecordError.STOP_PIPELINE) {
                return true;
            }
        }
        return false;
    }
}
