package _ss_com.streamsets.datacollector.runner;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.streamsets.datacollector.execution.runner.common.Constants;
import _ss_com.streamsets.datacollector.memory.MemoryMonitor;
import _ss_com.streamsets.datacollector.memory.MemoryUsageCollector;
import _ss_com.streamsets.datacollector.memory.MemoryUsageCollectorResourceBundle;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.restapi.bean.MetricRegistryJson;
import _ss_com.streamsets.datacollector.runner.FilterRecordBatch;
import _ss_com.streamsets.datacollector.runner.Pipe;
import _ss_com.streamsets.datacollector.util.AggregatorUtil;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.validation.Issue;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.StageType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/runner/StagePipe.class */
public class StagePipe extends Pipe<Context> {
    private static final Logger LOG = LoggerFactory.getLogger(StagePipe.class);
    public static final String RUNTIME_STATS_GAUGE = "RuntimeStatsGauge";
    private Timer processingTimer;
    private Counter memoryConsumedCounter;
    private Meter inputRecordsMeter;
    private Meter outputRecordsMeter;
    private Meter errorRecordsMeter;
    private Meter stageErrorMeter;
    private Counter inputRecordsCounter;
    private Counter outputRecordsCounter;
    private Counter errorRecordsCounter;
    private Counter stageErrorCounter;
    private Histogram inputRecordsHistogram;
    private Histogram outputRecordsHistogram;
    private Histogram errorRecordsHistogram;
    private Histogram stageErrorsHistogram;
    private Map<String, Counter> outputRecordsPerLaneCounter;
    private Map<String, Meter> outputRecordsPerLaneMeter;
    private Context context;
    private final ResourceControlledScheduledExecutor scheduledExecutorService;
    private final MemoryUsageCollectorResourceBundle memoryUsageCollectorResourceBundle;
    private final String name;
    private final String rev;
    private final Configuration configuration;
    private final MetricRegistryJson metricRegistryJson;
    private Map<String, Object> batchMetrics;
    FilterRecordBatch.Predicate[] predicates;

    /* loaded from: input_file:_ss_com/streamsets/datacollector/runner/StagePipe$Context.class */
    public interface Context extends Pipe.Context {
        RuntimeStats getRuntimeStats();
    }

    @VisibleForTesting
    StagePipe(StageRuntime stageRuntime, List<String> list, List<String> list2, List<String> list3) {
        this("myPipeline", "0", new Configuration(), stageRuntime, list, list2, list3, new ResourceControlledScheduledExecutor(0.02f), new MemoryUsageCollectorResourceBundle(), null);
    }

    public StagePipe(String str, String str2, Configuration configuration, StageRuntime stageRuntime, List<String> list, List<String> list2, List<String> list3, ResourceControlledScheduledExecutor resourceControlledScheduledExecutor, MemoryUsageCollectorResourceBundle memoryUsageCollectorResourceBundle, MetricRegistryJson metricRegistryJson) {
        super(stageRuntime, list, list2, list3);
        this.name = str;
        this.rev = str2;
        this.configuration = configuration;
        this.scheduledExecutorService = resourceControlledScheduledExecutor;
        this.memoryUsageCollectorResourceBundle = memoryUsageCollectorResourceBundle;
        this.metricRegistryJson = metricRegistryJson;
        this.batchMetrics = new HashMap();
    }

    @Override // _ss_com.streamsets.datacollector.runner.Pipe
    public List<Issue> init(Context context) throws StageException {
        List<Issue> init = getStage().init();
        if (init.isEmpty()) {
            MetricRegistry metrics = getStage().getContext().getMetrics();
            String str = "stage." + getStage().getConfiguration().getInstanceName();
            this.processingTimer = MetricsConfigurator.createStageTimer(metrics, str + ".batchProcessing", this.name, this.rev);
            this.memoryConsumedCounter = MetricsConfigurator.createStageCounter(metrics, str + ".memoryConsumed", this.name, this.rev);
            this.inputRecordsMeter = MetricsConfigurator.createStageMeter(metrics, str + ".inputRecords", this.name, this.rev);
            this.outputRecordsMeter = MetricsConfigurator.createStageMeter(metrics, str + ".outputRecords", this.name, this.rev);
            this.errorRecordsMeter = MetricsConfigurator.createStageMeter(metrics, str + ".errorRecords", this.name, this.rev);
            this.stageErrorMeter = MetricsConfigurator.createStageMeter(metrics, str + ".stageErrors", this.name, this.rev);
            this.inputRecordsCounter = MetricsConfigurator.createStageCounter(metrics, str + ".inputRecords", this.name, this.rev);
            this.outputRecordsCounter = MetricsConfigurator.createStageCounter(metrics, str + ".outputRecords", this.name, this.rev);
            this.errorRecordsCounter = MetricsConfigurator.createStageCounter(metrics, str + ".errorRecords", this.name, this.rev);
            this.stageErrorCounter = MetricsConfigurator.createStageCounter(metrics, str + ".stageErrors", this.name, this.rev);
            this.inputRecordsHistogram = MetricsConfigurator.createStageHistogram5Min(metrics, str + ".inputRecords", this.name, this.rev);
            this.outputRecordsHistogram = MetricsConfigurator.createStageHistogram5Min(metrics, str + ".outputRecords", this.name, this.rev);
            this.errorRecordsHistogram = MetricsConfigurator.createStageHistogram5Min(metrics, str + ".errorRecords", this.name, this.rev);
            this.stageErrorsHistogram = MetricsConfigurator.createStageHistogram5Min(metrics, str + ".stageErrors", this.name, this.rev);
            if (this.metricRegistryJson != null) {
                this.inputRecordsMeter.mark(this.metricRegistryJson.getMeters().get(str + ".inputRecords" + MetricsConfigurator.METER_SUFFIX).getCount());
                this.outputRecordsMeter.mark(this.metricRegistryJson.getMeters().get(str + ".outputRecords" + MetricsConfigurator.METER_SUFFIX).getCount());
                this.errorRecordsMeter.mark(this.metricRegistryJson.getMeters().get(str + ".errorRecords" + MetricsConfigurator.METER_SUFFIX).getCount());
                this.stageErrorMeter.mark(this.metricRegistryJson.getMeters().get(str + ".stageErrors" + MetricsConfigurator.METER_SUFFIX).getCount());
                this.inputRecordsCounter.inc(this.metricRegistryJson.getCounters().get(str + ".inputRecords" + MetricsConfigurator.COUNTER_SUFFIX).getCount());
                this.outputRecordsCounter.inc(this.metricRegistryJson.getCounters().get(str + ".outputRecords" + MetricsConfigurator.COUNTER_SUFFIX).getCount());
                this.errorRecordsCounter.inc(this.metricRegistryJson.getCounters().get(str + ".errorRecords" + MetricsConfigurator.COUNTER_SUFFIX).getCount());
                this.stageErrorCounter.inc(this.metricRegistryJson.getCounters().get(str + ".stageErrors" + MetricsConfigurator.COUNTER_SUFFIX).getCount());
                this.inputRecordsHistogram.update(this.metricRegistryJson.getHistograms().get(str + ".inputRecords" + MetricsConfigurator.HISTOGRAM_M5_SUFFIX).getCount());
                this.outputRecordsHistogram.update(this.metricRegistryJson.getHistograms().get(str + ".outputRecords" + MetricsConfigurator.HISTOGRAM_M5_SUFFIX).getCount());
                this.errorRecordsHistogram.update(this.metricRegistryJson.getHistograms().get(str + ".errorRecords" + MetricsConfigurator.HISTOGRAM_M5_SUFFIX).getCount());
                this.stageErrorsHistogram.update(this.metricRegistryJson.getHistograms().get(str + ".stageErrors" + MetricsConfigurator.HISTOGRAM_M5_SUFFIX).getCount());
            }
            if (!getStage().getConfiguration().getOutputAndEventLanes().isEmpty()) {
                this.outputRecordsPerLaneCounter = new HashMap();
                this.outputRecordsPerLaneMeter = new HashMap();
                for (String str2 : getStage().getConfiguration().getOutputAndEventLanes()) {
                    Counter createStageCounter = MetricsConfigurator.createStageCounter(metrics, str + Constants.MASTER_SDC_ID_SEPARATOR + str2 + ".outputRecords", this.name, this.rev);
                    if (this.metricRegistryJson != null) {
                        createStageCounter.inc(this.metricRegistryJson.getCounters().get(str + Constants.MASTER_SDC_ID_SEPARATOR + str2 + ".outputRecords" + MetricsConfigurator.COUNTER_SUFFIX).getCount());
                    }
                    this.outputRecordsPerLaneCounter.put(str2, createStageCounter);
                    Meter createStageMeter = MetricsConfigurator.createStageMeter(metrics, str + Constants.MASTER_SDC_ID_SEPARATOR + str2 + ".outputRecords", this.name, this.rev);
                    if (this.metricRegistryJson != null) {
                        createStageMeter.mark(this.metricRegistryJson.getMeters().get(str + Constants.MASTER_SDC_ID_SEPARATOR + str2 + ".outputRecords" + MetricsConfigurator.METER_SUFFIX).getCount());
                    }
                    this.outputRecordsPerLaneMeter.put(str2, createStageMeter);
                }
            }
            this.context = context;
            if (this.configuration.get("monitor.memory", false)) {
                LOG.info("Starting memory collector for {}", getStage().getInfo().getInstanceName());
                this.scheduledExecutorService.submit(new MemoryMonitor(this.memoryConsumedCounter, () -> {
                    return new MemoryUsageCollector.Builder().setMemoryUsageCollectorResourceBundle(this.memoryUsageCollectorResourceBundle).setStageRuntime(getStage()).build();
                }));
            }
            createRuntimeStatsGauge(metrics);
            this.predicates = new FilterRecordBatch.Predicate[2];
            this.predicates[0] = new RequiredFieldsPredicate(getStage().getRequiredFields());
            this.predicates[1] = new PreconditionsPredicate(getStage().getContext(), getStage().getPreconditions());
        }
        return init;
    }

    @Override // _ss_com.streamsets.datacollector.runner.Pipe
    public void process(PipeBatch pipeBatch) throws StageException {
        BatchMakerImpl startStage = pipeBatch.startStage(this);
        BatchImpl batch = pipeBatch.getBatch(this);
        ErrorSink errorSink = pipeBatch.getErrorSink();
        EventSink eventSink = pipeBatch.getEventSink();
        ProcessedSink processedSink = pipeBatch.getProcessedSink();
        SourceResponseSink sourceResponseSink = pipeBatch.getSourceResponseSink();
        String previousOffset = pipeBatch.getPreviousOffset();
        getStage().setSinks(errorSink, eventSink, processedSink, sourceResponseSink);
        FilterRecordBatch filterRecordBatch = new FilterRecordBatch(batch, this.predicates, getStage().getContext());
        long currentTimeMillis = System.currentTimeMillis();
        String execute = getStage().execute(previousOffset, pipeBatch.getBatchSize(), filterRecordBatch, startStage, errorSink, eventSink, processedSink, sourceResponseSink);
        if (isSource()) {
            pipeBatch.setNewOffset(execute);
        }
        this.batchMetrics = finishBatchAndCalculateMetrics(currentTimeMillis, pipeBatch, startStage, batch, errorSink, eventSink, execute);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, Object> finishBatchAndCalculateMetrics(long j, PipeBatch pipeBatch, BatchMakerImpl batchMakerImpl, BatchImpl batchImpl, ErrorSink errorSink, EventSink eventSink, String str) throws StageException {
        long currentTimeMillis = System.currentTimeMillis() - j;
        this.processingTimer.update(currentTimeMillis, TimeUnit.MILLISECONDS);
        int size = batchImpl.getSize();
        this.inputRecordsCounter.inc(size);
        this.inputRecordsMeter.mark(size);
        this.inputRecordsHistogram.update(size);
        int size2 = errorSink.getErrorRecords(getStage().getInfo().getInstanceName()).size();
        this.errorRecordsCounter.inc(size2);
        this.errorRecordsMeter.mark(size2);
        this.errorRecordsHistogram.update(size2);
        int size3 = batchMakerImpl.getSize();
        if (isTargetOrExecutor()) {
            size3 = size - size2;
        }
        this.outputRecordsCounter.inc(size3);
        this.outputRecordsMeter.mark(size3);
        this.outputRecordsHistogram.update(size3);
        int size4 = errorSink.getStageErrors(getStage().getInfo().getInstanceName()).size();
        increaseStageErrorMetrics(size4);
        HashMap hashMap = new HashMap();
        if (!getStage().getConfiguration().getOutputLanes().isEmpty()) {
            for (String str2 : getStage().getConfiguration().getOutputLanes()) {
                int size5 = batchMakerImpl.getSize(str2);
                hashMap.put(str2, Integer.valueOf(size5));
                this.outputRecordsPerLaneCounter.get(str2).inc(size5);
                this.outputRecordsPerLaneMeter.get(str2).mark(size5);
            }
        }
        if (!getStage().getConfiguration().getEventLanes().isEmpty()) {
            String str3 = getStage().getConfiguration().getEventLanes().get(0);
            int size6 = eventSink.getStageEvents(getStage().getInfo().getInstanceName()).size();
            hashMap.put(str3, Integer.valueOf(size6));
            this.outputRecordsPerLaneCounter.get(str3).inc(size6);
            this.outputRecordsPerLaneMeter.get(str3).mark(size6);
        }
        HashMap hashMap2 = new HashMap();
        hashMap2.put(AggregatorUtil.PROCESSING_TIME, Long.valueOf(currentTimeMillis));
        hashMap2.put(AggregatorUtil.INPUT_RECORDS, Integer.valueOf(size));
        hashMap2.put(AggregatorUtil.ERROR_RECORDS, Integer.valueOf(size2));
        hashMap2.put(AggregatorUtil.OUTPUT_RECORDS, Integer.valueOf(size3));
        hashMap2.put(AggregatorUtil.STAGE_ERROR, Integer.valueOf(size4));
        hashMap2.put(AggregatorUtil.OUTPUT_RECORDS_PER_LANE, hashMap);
        pipeBatch.completeStage(batchMakerImpl);
        if (isSource()) {
            if (size3 > 0) {
                this.context.getRuntimeStats().setTimeOfLastReceivedRecord(System.currentTimeMillis());
            }
            this.context.getRuntimeStats().incBatchCount();
        }
        return hashMap2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void increaseStageErrorMetrics(int i) {
        this.stageErrorCounter.inc(i);
        this.stageErrorMeter.mark(i);
        this.stageErrorsHistogram.update(i);
    }

    @Override // _ss_com.streamsets.datacollector.runner.Pipe
    public void destroy(PipeBatch pipeBatch) throws StageException {
        EventSink eventSink = pipeBatch.getEventSink();
        getStage().destroy(pipeBatch.getErrorSink(), eventSink, pipeBatch.getProcessedSink());
        pipeBatch.completeStage(this);
    }

    public long getMemoryConsumed() {
        return this.memoryConsumedCounter.getCount();
    }

    public Map<String, Object> getBatchMetrics() {
        return this.batchMetrics;
    }

    private Gauge<Object> createRuntimeStatsGauge(MetricRegistry metricRegistry) {
        Gauge<Object> gauge = MetricsConfigurator.getGauge(metricRegistry, RUNTIME_STATS_GAUGE);
        if (gauge == null) {
            gauge = () -> {
                return this.context.getRuntimeStats();
            };
            try {
                MetricsConfigurator.createGauge(metricRegistry, RUNTIME_STATS_GAUGE, gauge, this.name, this.rev);
            } catch (Exception e) {
                for (StackTraceElement stackTraceElement : e.getStackTrace()) {
                    LOG.error(stackTraceElement.toString());
                }
                throw e;
            }
        }
        return gauge;
    }

    private boolean isSource() {
        return getStage().getDefinition().getType() == StageType.SOURCE;
    }

    private boolean isTargetOrExecutor() {
        return getStage().getDefinition().getType().isOneOf(new StageType[]{StageType.TARGET, StageType.EXECUTOR});
    }
}
