package _ss_com.streamsets.datacollector.execution.runner.common;

import _ss_com.streamsets.datacollector.execution.alerts.AlertManager;
import _ss_com.streamsets.datacollector.restapi.bean.MetricRegistryJson;
import _ss_com.streamsets.datacollector.runner.production.DataRulesEvaluationRequest;
import _ss_com.streamsets.datacollector.runner.production.PipelineErrorNotificationRequest;
import _ss_com.streamsets.datacollector.runner.production.RulesConfigurationChangeRequest;
import _ss_com.streamsets.datacollector.util.Configuration;
import com.codahale.metrics.MetricRegistry;
import com.streamsets.pipeline.api.Record;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/common/DataObserverRunnable.class */
public class DataObserverRunnable implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DataObserverRunnable.class);
    public static final String RUNNABLE_NAME = "DataObserverRunnable";
    private static final int SCHEDULED_DELAY = -1;
    private BlockingQueue<Object> requestQueue;
    private final DataObserverRunner dataObserverRunner;
    private final ThreadHealthReporter threadHealthReporter;

    public DataObserverRunnable(String str, String str2, ThreadHealthReporter threadHealthReporter, MetricRegistry metricRegistry, AlertManager alertManager, Configuration configuration, Map<String, Object> map) {
        this.dataObserverRunner = new DataObserverRunner(str, str2, metricRegistry, alertManager, configuration, map);
        this.threadHealthReporter = threadHealthReporter;
    }

    public void setRequestQueue(BlockingQueue<Object> blockingQueue) {
        this.requestQueue = blockingQueue;
    }

    public void setStatsQueue(BlockingQueue<Record> blockingQueue) {
        this.dataObserverRunner.setStatsQueue(blockingQueue);
    }

    @Override // java.lang.Runnable
    public void run() {
        String name = Thread.currentThread().getName();
        Thread.currentThread().setName(name + "-" + RUNNABLE_NAME);
        while (true) {
            try {
                this.threadHealthReporter.reportHealth(RUNNABLE_NAME, -1, System.currentTimeMillis());
                try {
                    Object poll = this.requestQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        if (poll instanceof DataRulesEvaluationRequest) {
                            this.dataObserverRunner.handleDataRulesEvaluationRequest((DataRulesEvaluationRequest) poll);
                        } else if (poll instanceof RulesConfigurationChangeRequest) {
                            this.dataObserverRunner.handleConfigurationChangeRequest((RulesConfigurationChangeRequest) poll);
                        } else if (poll instanceof PipelineErrorNotificationRequest) {
                            this.dataObserverRunner.handlePipelineErrorNotificationRequest((PipelineErrorNotificationRequest) poll);
                        } else {
                            LOG.error("Unknown request: " + poll.getClass().getName());
                        }
                    }
                } catch (InterruptedException e) {
                    LOG.debug("Stopping the Pipeline Observer, Reason: {}", e.toString(), e);
                    Thread.currentThread().setName(name);
                    return;
                }
            } catch (Throwable th) {
                Thread.currentThread().setName(name);
                throw th;
            }
        }
    }

    public void setMetricRegistryJson(MetricRegistryJson metricRegistryJson) {
        this.dataObserverRunner.setMetricRegistryJson(metricRegistryJson);
    }

    public List<SampledRecord> getSampledRecords(String str, int i) {
        return this.dataObserverRunner.getSampledRecords(str, i);
    }
}
